diff --git a/src/core/qlist.cc b/src/core/qlist.cc index d8054c32d4c2..8d7acf4f2389 100644 --- a/src/core/qlist.cc +++ b/src/core/qlist.cc @@ -571,7 +571,7 @@ string QList::Pop(Where where) { } /* The head and tail should never be compressed */ - DCHECK(node->encoding != QUICKLIST_NODE_ENCODING_LZF); + DCHECK_EQ(node->encoding, QUICKLIST_NODE_ENCODING_RAW); DCHECK(head_->prev->next == nullptr); string res; @@ -1151,6 +1151,17 @@ void QList::DelNode(Node* node) { * now have compressed nodes needing to be decompressed. */ CompressByDepth(NULL); + // Head and tail must always be uncompressed. A deletion may promote a + // ZSTD-compressed interior node to head or tail. + if (head_) { + if (head_->IsCompressed()) { + malloc_size_ += TryDecompressInternal(false, head_); + } + if (head_->prev->IsCompressed()) { + malloc_size_ += TryDecompressInternal(false, head_->prev); + } + } + zfree(node->entry); zfree(node); } @@ -1526,28 +1537,25 @@ bool QList::TrainZstdDict() { void QList::CompressWithZstdDict() { DCHECK(tl_zstd_dict); - // Bulk-compress all interior nodes. + // Bulk-compress all interior nodes, tracking memory delta. bool any_compressed = false; bool any_attempted = false; for (Node* node = head_; node; node = node->next) { if (node == head_ || node->next == nullptr) continue; - any_attempted = true; - if (CompressNodeWithDict(node)) + if (node->encoding == QUICKLIST_NODE_ENCODING_RAW && node->sz >= MIN_COMPRESS_BYTES) + any_attempted = true; + size_t prev_size = zmalloc_usable_size(node->entry); + if (CompressNodeWithDict(node)) { any_compressed = true; + malloc_size_ += zmalloc_usable_size(node->entry) - prev_size; + } } // Only mark failure if we actually tried to compress nodes and all failed. if (any_attempted && !any_compressed) { dict_compress_failed_ = 1; } - - // Recalculate malloc_size_. - malloc_size_ = 0; - for (Node* node = head_; node; node = node->next) { - malloc_size_ += zmalloc_usable_size(node->entry) + sizeof(Node); - } - malloc_size_ += znallocx(sizeof(QList)); } bool QList::CompressNodeWithDict(Node* node) { @@ -1558,6 +1566,8 @@ bool QList::CompressNodeWithDict(Node* node) { if (node->sz < MIN_COMPRESS_BYTES) return false; + stats.compression_attempts++; + size_t bound = ZSTD_compressBound(node->sz); quicklistLZF* dest = (quicklistLZF*)zmalloc(sizeof(quicklistLZF) + bound); ZSTD_CCtx_reset(tl_zstd_dict->cctx, ZSTD_reset_session_only); @@ -1565,8 +1575,11 @@ bool QList::CompressNodeWithDict(Node* node) { node->sz, tl_zstd_dict->cdict); CHECK(!ZSTD_isError(csz)) << ZSTD_getErrorName(csz); - if (csz + MIN_COMPRESS_IMPROVE >= node->sz) { + // Reject if absolute improvement is too small or ratio is not good enough. + // The ratio check (30% savings required) avoids storing incompressible blobs. + if (csz + MIN_COMPRESS_IMPROVE >= node->sz || csz > node->sz * 7 / 10) { zfree(dest); + stats.bad_compression_attempts++; return false; } diff --git a/src/core/qlist_test.cc b/src/core/qlist_test.cc index 1afaa8f4aef3..29f887a2425b 100644 --- a/src/core/qlist_test.cc +++ b/src/core/qlist_test.cc @@ -10,6 +10,8 @@ #include #include +#include + #include "base/gtest.h" #include "base/logging.h" #include "core/mi_memory_resource.h" @@ -1173,6 +1175,29 @@ TEST_F(QListZstdTest, PopAfterCompress) { EXPECT_FALSE(tail.empty()); } +TEST_F(QListZstdTest, PopDrainsHeadNode) { + QList ql(-1, 0); // fill=-1 means 4KB nodes + ql.set_compr_threshold(1); + PopulateWithCeleryData(ql, 500); + + unsigned initial_nodes = ql.node_count(); + ASSERT_GE(initial_nodes, 3u); + + // Pop enough elements from HEAD to delete the head node entirely, + // promoting a formerly-compressed interior node to head. + while (ql.node_count() == initial_nodes) { + string val = ql.Pop(QList::HEAD); + ASSERT_FALSE(val.empty()); + } + // Head node was deleted and a new head was promoted. + EXPECT_EQ(ql.node_count(), initial_nodes - 1); + + // Continue popping — the new head must be decompressed and valid. + string val = ql.Pop(QList::HEAD); + EXPECT_FALSE(val.empty()); + EXPECT_GT(val.size(), 100u); +} + TEST_F(QListZstdTest, SmallListSkipped) { QList ql(-2, 0); // compress=0 so ZSTD path is active (LZF disabled) PopulateWithCeleryData(ql, 5); @@ -1244,4 +1269,65 @@ TEST_F(QListZstdTest, IncrementalCompression) { EXPECT_EQ(count, ql.Size()); } +TEST_F(QListZstdTest, IncompressibleDataNotCompressed) { + // Train a dictionary with compressible Celery data. + QList ql_train(-1, 0); + ql_train.set_compr_threshold(1); + PopulateWithCeleryData(ql_train, 500); + + // Dictionary is now trained in thread-local state. + // Create a new list with random (incompressible) data. + QList ql(-1, 0); + ql.set_compr_threshold(1); + + auto initial_bad = QList::stats.bad_compression_attempts; + auto initial_attempts = QList::stats.compression_attempts; + + // Push random binary data - should not compress well with the Celery-trained dict. + std::mt19937 rng(42); + for (unsigned i = 0; i < 200; ++i) { + string random_blob(512, '\0'); + for (auto& c : random_blob) { + c = static_cast(rng() % 256); + } + ql.Push(random_blob, QList::TAIL); + } + + // Verify that compression was attempted but mostly rejected. + uint64_t attempts = QList::stats.compression_attempts - initial_attempts; + uint64_t bad = QList::stats.bad_compression_attempts - initial_bad; + EXPECT_GT(attempts, 0u); + EXPECT_GT(bad, 0u); + + // Verify data integrity. + unsigned count = 0; + ql.Iterate( + [&](const QList::Entry& e) { + ++count; + return true; + }, + 0, -1); + EXPECT_EQ(count, ql.Size()); +} + +TEST_F(QListZstdTest, StatsTracking) { + auto initial_attempts = QList::stats.compression_attempts; + auto initial_successes = QList::stats.zstd_dict_compressions; + auto initial_bad = QList::stats.bad_compression_attempts; + + QList ql(-1, 0); + ql.set_compr_threshold(1); + PopulateWithCeleryData(ql, 500); + + uint64_t attempts = QList::stats.compression_attempts - initial_attempts; + uint64_t successes = QList::stats.zstd_dict_compressions - initial_successes; + uint64_t bad = QList::stats.bad_compression_attempts - initial_bad; + + EXPECT_GT(attempts, 0u); + EXPECT_GT(successes, 0u); + EXPECT_EQ(attempts, successes + bad); + // For Celery data, compression should be very effective. + EXPECT_GT(successes, bad); +} + } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 94129a94e93d..7d9d37fc80af 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2054,6 +2054,20 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd AppendMetricValue("list_reads", m.qlist_stats.interior_node_reads, {"type"}, {"interior"}, &resp->body()); + AppendMetricHeader("list_compression_attempts", "List compression attempts", MetricType::COUNTER, + &resp->body()); + AppendMetricValue("list_compression_attempts", m.qlist_stats.compression_attempts, {"type"}, + {"total"}, &resp->body()); + AppendMetricValue("list_compression_attempts", m.qlist_stats.bad_compression_attempts, {"type"}, + {"fail"}, &resp->body()); + + AppendMetricHeader("list_compressed_bytes", "List compressed bytes", MetricType::GAUGE, + &resp->body()); + AppendMetricValue("list_compressed_bytes", m.qlist_stats.compressed_bytes, {"type"}, + {"compressed"}, &resp->body()); + AppendMetricValue("list_compressed_bytes", m.qlist_stats.raw_compressed_bytes, {"type"}, {"raw"}, + &resp->body()); + // Tiered metrics { AppendMetricWithoutLabels("tiered_entries", "Tiered entries", total.tiered_entries,