Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions src/core/qlist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ string QList::Pop(Where where) {
}

/* The head and tail should never be compressed */
DCHECK(node->encoding != QUICKLIST_NODE_ENCODING_LZF);
DCHECK_EQ(node->encoding, QUICKLIST_NODE_ENCODING_RAW);
DCHECK(head_->prev->next == nullptr);

string res;
Expand Down Expand Up @@ -1151,6 +1151,17 @@ void QList::DelNode(Node* node) {
* now have compressed nodes needing to be decompressed. */
CompressByDepth(NULL);

// Head and tail must always be uncompressed. A deletion may promote a
// ZSTD-compressed interior node to head or tail.
if (head_) {
if (head_->IsCompressed()) {
malloc_size_ += TryDecompressInternal(false, head_);
}
if (head_->prev->IsCompressed()) {
malloc_size_ += TryDecompressInternal(false, head_->prev);
}
}

zfree(node->entry);
zfree(node);
}
Expand Down Expand Up @@ -1526,28 +1537,25 @@ bool QList::TrainZstdDict() {
void QList::CompressWithZstdDict() {
DCHECK(tl_zstd_dict);

// Bulk-compress all interior nodes.
// Bulk-compress all interior nodes, tracking memory delta.
bool any_compressed = false;
bool any_attempted = false;
for (Node* node = head_; node; node = node->next) {
if (node == head_ || node->next == nullptr)
continue;
any_attempted = true;
if (CompressNodeWithDict(node))
if (node->encoding == QUICKLIST_NODE_ENCODING_RAW && node->sz >= MIN_COMPRESS_BYTES)
any_attempted = true;
size_t prev_size = zmalloc_usable_size(node->entry);
if (CompressNodeWithDict(node)) {
any_compressed = true;
malloc_size_ += zmalloc_usable_size(node->entry) - prev_size;
Comment thread
romange marked this conversation as resolved.
Comment thread
romange marked this conversation as resolved.
}
}

// Only mark failure if we actually tried to compress nodes and all failed.
if (any_attempted && !any_compressed) {
dict_compress_failed_ = 1;
}
Comment thread
romange marked this conversation as resolved.

// Recalculate malloc_size_.
malloc_size_ = 0;
for (Node* node = head_; node; node = node->next) {
malloc_size_ += zmalloc_usable_size(node->entry) + sizeof(Node);
}
malloc_size_ += znallocx(sizeof(QList));
}

bool QList::CompressNodeWithDict(Node* node) {
Expand All @@ -1558,15 +1566,20 @@ bool QList::CompressNodeWithDict(Node* node) {
if (node->sz < MIN_COMPRESS_BYTES)
return false;

stats.compression_attempts++;

size_t bound = ZSTD_compressBound(node->sz);
quicklistLZF* dest = (quicklistLZF*)zmalloc(sizeof(quicklistLZF) + bound);
ZSTD_CCtx_reset(tl_zstd_dict->cctx, ZSTD_reset_session_only);
size_t csz = ZSTD_compress_usingCDict(tl_zstd_dict->cctx, dest->compressed, bound, node->entry,
node->sz, tl_zstd_dict->cdict);
CHECK(!ZSTD_isError(csz)) << ZSTD_getErrorName(csz);

if (csz + MIN_COMPRESS_IMPROVE >= node->sz) {
// Reject if absolute improvement is too small or ratio is not good enough.
// The ratio check (30% savings required) avoids storing incompressible blobs.
if (csz + MIN_COMPRESS_IMPROVE >= node->sz || csz > node->sz * 7 / 10) {
Comment thread
romange marked this conversation as resolved.
zfree(dest);
stats.bad_compression_attempts++;
return false;
}

Expand Down
86 changes: 86 additions & 0 deletions src/core/qlist_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <gmock/gmock.h>
#include <mimalloc.h>

#include <random>

#include "base/gtest.h"
#include "base/logging.h"
#include "core/mi_memory_resource.h"
Expand Down Expand Up @@ -1173,6 +1175,29 @@ TEST_F(QListZstdTest, PopAfterCompress) {
EXPECT_FALSE(tail.empty());
}

TEST_F(QListZstdTest, PopDrainsHeadNode) {
QList ql(-1, 0); // fill=-1 means 4KB nodes
ql.set_compr_threshold(1);
PopulateWithCeleryData(ql, 500);

unsigned initial_nodes = ql.node_count();
ASSERT_GE(initial_nodes, 3u);

// Pop enough elements from HEAD to delete the head node entirely,
// promoting a formerly-compressed interior node to head.
while (ql.node_count() == initial_nodes) {
string val = ql.Pop(QList::HEAD);
ASSERT_FALSE(val.empty());
}
// Head node was deleted and a new head was promoted.
EXPECT_EQ(ql.node_count(), initial_nodes - 1);

// Continue popping — the new head must be decompressed and valid.
string val = ql.Pop(QList::HEAD);
EXPECT_FALSE(val.empty());
EXPECT_GT(val.size(), 100u);
}

TEST_F(QListZstdTest, SmallListSkipped) {
QList ql(-2, 0); // compress=0 so ZSTD path is active (LZF disabled)
PopulateWithCeleryData(ql, 5);
Expand Down Expand Up @@ -1244,4 +1269,65 @@ TEST_F(QListZstdTest, IncrementalCompression) {
EXPECT_EQ(count, ql.Size());
}

TEST_F(QListZstdTest, IncompressibleDataNotCompressed) {
// Train a dictionary with compressible Celery data.
QList ql_train(-1, 0);
ql_train.set_compr_threshold(1);
PopulateWithCeleryData(ql_train, 500);

// Dictionary is now trained in thread-local state.
// Create a new list with random (incompressible) data.
QList ql(-1, 0);
ql.set_compr_threshold(1);

auto initial_bad = QList::stats.bad_compression_attempts;
auto initial_attempts = QList::stats.compression_attempts;

// Push random binary data - should not compress well with the Celery-trained dict.
std::mt19937 rng(42);
for (unsigned i = 0; i < 200; ++i) {
string random_blob(512, '\0');
for (auto& c : random_blob) {
c = static_cast<char>(rng() % 256);
}
ql.Push(random_blob, QList::TAIL);
}

// Verify that compression was attempted but mostly rejected.
uint64_t attempts = QList::stats.compression_attempts - initial_attempts;
uint64_t bad = QList::stats.bad_compression_attempts - initial_bad;
EXPECT_GT(attempts, 0u);
EXPECT_GT(bad, 0u);

// Verify data integrity.
unsigned count = 0;
ql.Iterate(
[&](const QList::Entry& e) {
++count;
return true;
},
0, -1);
EXPECT_EQ(count, ql.Size());
}

TEST_F(QListZstdTest, StatsTracking) {
auto initial_attempts = QList::stats.compression_attempts;
auto initial_successes = QList::stats.zstd_dict_compressions;
auto initial_bad = QList::stats.bad_compression_attempts;

QList ql(-1, 0);
ql.set_compr_threshold(1);
PopulateWithCeleryData(ql, 500);

uint64_t attempts = QList::stats.compression_attempts - initial_attempts;
uint64_t successes = QList::stats.zstd_dict_compressions - initial_successes;
uint64_t bad = QList::stats.bad_compression_attempts - initial_bad;

EXPECT_GT(attempts, 0u);
EXPECT_GT(successes, 0u);
EXPECT_EQ(attempts, successes + bad);
// For Celery data, compression should be very effective.
EXPECT_GT(successes, bad);
}

} // namespace dfly
14 changes: 14 additions & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2054,6 +2054,20 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
AppendMetricValue("list_reads", m.qlist_stats.interior_node_reads, {"type"}, {"interior"},
&resp->body());

AppendMetricHeader("list_compression_attempts", "List compression attempts", MetricType::COUNTER,
&resp->body());
AppendMetricValue("list_compression_attempts", m.qlist_stats.compression_attempts, {"type"},
{"total"}, &resp->body());
AppendMetricValue("list_compression_attempts", m.qlist_stats.bad_compression_attempts, {"type"},
{"fail"}, &resp->body());

AppendMetricHeader("list_compressed_bytes", "List compressed bytes", MetricType::GAUGE,
&resp->body());
AppendMetricValue("list_compressed_bytes", m.qlist_stats.compressed_bytes, {"type"},
{"compressed"}, &resp->body());
AppendMetricValue("list_compressed_bytes", m.qlist_stats.raw_compressed_bytes, {"type"}, {"raw"},
&resp->body());

// Tiered metrics
{
AppendMetricWithoutLabels("tiered_entries", "Tiered entries", total.tiered_entries,
Expand Down
Loading