Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cpp/src/arrow/util/compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "arrow/result.h"
#include "arrow/status.h"
Expand Down Expand Up @@ -200,11 +201,16 @@ Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
codec = internal::MakeLz4HadoopRawCodec();
#endif
break;
case Compression::ZSTD:
case Compression::ZSTD: {
#ifdef ARROW_WITH_ZSTD
codec = internal::MakeZSTDCodec(compression_level);
auto opt = dynamic_cast<const ZstdCodecOptions*>(&codec_options);
codec = internal::MakeZSTDCodec(
compression_level,
opt ? opt->compression_context_params : std::vector<std::pair<int, int>>{},
opt ? opt->decompression_context_params : std::vector<std::pair<int, int>>{});
#endif
break;
}
case Compression::BZ2:
#ifdef ARROW_WITH_BZ2
codec = internal::MakeBZ2Codec(compression_level);
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/util/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>

#include "arrow/result.h"
#include "arrow/status.h"
Expand Down Expand Up @@ -142,6 +144,16 @@ class ARROW_EXPORT BrotliCodecOptions : public CodecOptions {
std::optional<int> window_bits;
};

// ----------------------------------------------------------------------
// Zstd codec options implementation

class ARROW_EXPORT ZstdCodecOptions : public CodecOptions {
public:
// Valid keys can be found at https://facebook.github.io/zstd/zstd_manual.html.
std::vector<std::pair<int, int>> compression_context_params;
std::vector<std::pair<int, int>> decompression_context_params;
};

/// \brief Compression codec
class ARROW_EXPORT Codec {
public:
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/util/compression_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#pragma once

#include <memory>
#include <utility>
#include <vector>

#include "arrow/util/compression.h" // IWYU pragma: export

Expand Down Expand Up @@ -74,7 +76,9 @@ std::unique_ptr<Codec> MakeLz4HadoopRawCodec();
constexpr int kZSTDDefaultCompressionLevel = 1;

std::unique_ptr<Codec> MakeZSTDCodec(
int compression_level = kZSTDDefaultCompressionLevel);
int compression_level = kZSTDDefaultCompressionLevel,
std::vector<std::pair<int, int>> compression_context_params = {},
std::vector<std::pair<int, int>> decompression_context_params = {});

} // namespace internal
} // namespace util
Expand Down
216 changes: 166 additions & 50 deletions cpp/src/arrow/util/compression_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
// under the License.

#include <algorithm>
#include <concepts>
#include <cstdint>
#include <cstring>
#include <memory>
#include <ostream>
#include <random>
#include <span>
#include <string>
#include <utility>
#include <vector>

#include <gtest/gtest.h>
Expand Down Expand Up @@ -446,36 +449,17 @@ TEST(TestCodecMisc, SpecifyCompressionLevel) {
}
}

TEST(TestCodecMisc, SpecifyCodecOptionsGZip) {
// for now only GZIP & Brotli codec options supported, since it has specific parameters
// to be customized, other codecs could directly go with CodecOptions, could add more
// specific codec options if needed.
struct CombinationOption {
int level;
GZipFormat format;
int window_bits;
bool expect_success;
};
constexpr CombinationOption combinations[] = {{2, GZipFormat::ZLIB, 12, true},
{9, GZipFormat::GZIP, 9, true},
{9, GZipFormat::GZIP, 20, false},
{5, GZipFormat::DEFLATE, -12, false},
{-992, GZipFormat::GZIP, 15, false}};

template <std::derived_from<arrow::util::CodecOptions> T>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat, a concept 😀

void CheckSpecifyCodecOptions(Compression::type compression,
std::span<const std::pair<T, bool>> options) {
std::vector<uint8_t> data = MakeRandomData(2000);
for (const auto& combination : combinations) {
const auto compression = Compression::GZIP;
for (const auto& [codec_option, expect_success] : options) {
if (!Codec::IsAvailable(compression)) {
// Support for this codec hasn't been built
continue;
}
auto codec_options = arrow::util::GZipCodecOptions();
codec_options.compression_level = combination.level;
codec_options.gzip_format = combination.format;
codec_options.window_bits = combination.window_bits;
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
auto result1 = Codec::Create(compression, codec_option);
auto result2 = Codec::Create(compression, codec_option);
ASSERT_EQ(expect_success, result1.ok());
ASSERT_EQ(expect_success, result2.ok());
if (expect_success) {
Expand All @@ -484,37 +468,169 @@ TEST(TestCodecMisc, SpecifyCodecOptionsGZip) {
}
}

TEST(TestCodecMisc, SpecifyCodecOptionsGZip) {
auto make_option = [](int compression_level, GZipFormat format,
std::optional<int> window_bits) {
arrow::util::GZipCodecOptions option;
option.compression_level = compression_level;
option.gzip_format = format;
option.window_bits = window_bits;
return option;
};
const std::pair<arrow::util::GZipCodecOptions, bool> options[]{
{make_option(5, GZipFormat::GZIP, 15), true},
{make_option(9, GZipFormat::ZLIB, 12), true},
{make_option(-1, GZipFormat::DEFLATE, 10), true},
{make_option(10, GZipFormat::GZIP, 25), false},
{make_option(-992, GZipFormat::GZIP, 15), false},
};
CheckSpecifyCodecOptions<arrow::util::GZipCodecOptions>(Compression::GZIP, options);
}

TEST(TestCodecMisc, SpecifyCodecOptionsBrotli) {
// for now only GZIP & Brotli codec options supported, since it has specific parameters
// to be customized, other codecs could directly go with CodecOptions, could add more
// specific codec options if needed.
struct CombinationOption {
int level;
int window_bits;
bool expect_success;
auto make_option = [](int compression_level, std::optional<int> window_bits) {
arrow::util::BrotliCodecOptions option;
option.compression_level = compression_level;
option.window_bits = window_bits;
return option;
};
constexpr CombinationOption combinations[] = {
{8, 22, true}, {11, 10, true}, {1, 24, true}, {5, -12, false}, {-992, 25, false}};
const std::pair<arrow::util::BrotliCodecOptions, bool> options[]{
{make_option(8, 22), true}, {make_option(11, 10), true},
{make_option(1, 24), true}, {make_option(5, -12), false},
{make_option(-992, 25), false},
};
CheckSpecifyCodecOptions<arrow::util::BrotliCodecOptions>(Compression::BROTLI, options);
}

std::vector<uint8_t> data = MakeRandomData(2000);
for (const auto& combination : combinations) {
const auto compression = Compression::BROTLI;
if (!Codec::IsAvailable(compression)) {
// Support for this codec hasn't been built
continue;
TEST(TestCodecMisc, SpecifyCodecOptionsZstd) {
auto make_option = [](int compression_level,
std::vector<std::pair<int, int>> compression_context_params,
std::vector<std::pair<int, int>> decompression_context_params) {
arrow::util::ZstdCodecOptions option;
option.compression_level = compression_level;
option.compression_context_params = std::move(compression_context_params);
option.decompression_context_params = std::move(decompression_context_params);
return option;
};
constexpr int ZSTD_c_windowLog = 101;
const std::pair<arrow::util::ZstdCodecOptions, bool> options[]{
{make_option(2, {}, {}), true},
{make_option(9, {}, {}), true},
{make_option(15, {}, {}), true},
{make_option(-992, {}, {}), true},
{make_option(3, {{ZSTD_c_windowLog, 23}}, {}), true},
{make_option(3, {{ZSTD_c_windowLog, 28}}, {}), true}};
CheckSpecifyCodecOptions<arrow::util::ZstdCodecOptions>(Compression::ZSTD, options);
}

TEST(TestCodecMisc, ZstdLargerWindowLog) {
constexpr int ZSTD_c_windowLog = 101;

arrow::util::ZstdCodecOptions option1;
arrow::util::ZstdCodecOptions option2;
option2.compression_context_params = {{ZSTD_c_windowLog, 28}};

std::vector<uint8_t> data = MakeRandomData(4 * 1024 * 1024);
data.reserve(data.size() * 2);
data.insert(data.end(), data.begin(), data.end());

auto compress = [&data](const arrow::util::ZstdCodecOptions& codecOption)
-> Result<std::vector<uint8_t>> {
ARROW_ASSIGN_OR_RAISE(auto codec, Codec::Create(Compression::ZSTD, codecOption));
auto max_compressed_len = codec->MaxCompressedLen(data.size(), data.data());
std::vector<uint8_t> compressed(max_compressed_len);

ARROW_ASSIGN_OR_RAISE(
auto actual_size,
codec->Compress(data.size(), data.data(), max_compressed_len, compressed.data()));
compressed.resize(actual_size);
return compressed;
};

ASSERT_OK_AND_ASSIGN(auto compressed1, compress(option1));
ASSERT_OK_AND_ASSIGN(auto compressed2, compress(option2));
ASSERT_GT(compressed1.size(), compressed2.size());
}

TEST(TestCodecMisc, ZstdStreamLargerWindowLog) {
constexpr int ZSTD_c_windowLog = 101;
constexpr int ZSTD_d_windowLogMax = 100;

arrow::util::ZstdCodecOptions option1;
arrow::util::ZstdCodecOptions option2;
option2.compression_context_params = {{ZSTD_c_windowLog, 28}};
option2.decompression_context_params = {{ZSTD_d_windowLogMax, 28}};

std::vector<uint8_t> data = MakeRandomData(4 * 1024 * 1024);
data.reserve(data.size() * 2);
data.insert(data.end(), data.begin(), data.end());

ASSERT_OK_AND_ASSIGN(auto codec1, Codec::Create(Compression::ZSTD, option1));
ASSERT_OK_AND_ASSIGN(auto codec2, Codec::Create(Compression::ZSTD, option2));

auto compress = [&data](Codec& codec) -> Result<std::vector<uint8_t>> {
auto max_compressed_len = codec.MaxCompressedLen(data.size(), data.data());
std::vector<uint8_t> compressed(max_compressed_len);

int64_t bytes_written = 0;
int64_t bytes_read = 0;
ARROW_ASSIGN_OR_RAISE(auto compressor, codec.MakeCompressor());
while (bytes_read < static_cast<int64_t>(data.size())) {
ARROW_ASSIGN_OR_RAISE(
auto result,
compressor->Compress(data.size() - bytes_read, data.data() + bytes_read,
max_compressed_len - bytes_written,
compressed.data() + bytes_written));
bytes_written += result.bytes_written;
bytes_read += result.bytes_read;
}
auto codec_options = arrow::util::BrotliCodecOptions();
codec_options.compression_level = combination.level;
codec_options.window_bits = combination.window_bits;
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
ASSERT_EQ(expect_success, result1.ok());
ASSERT_EQ(expect_success, result2.ok());
if (expect_success) {
CheckCodecRoundtrip(*result1, *result2, data);
while (true) {
ARROW_ASSIGN_OR_RAISE(auto result,
compressor->End(max_compressed_len - bytes_written,
compressed.data() + bytes_written));
bytes_written += result.bytes_written;
if (!result.should_retry) {
break;
}
}
compressed.resize(bytes_written);
return compressed;
};

ASSERT_OK_AND_ASSIGN(auto compressed1, compress(*codec1));
ASSERT_OK_AND_ASSIGN(auto compressed2, compress(*codec2));
ASSERT_GT(compressed1.size(), compressed2.size());

ASSERT_OK_AND_ASSIGN(auto decompressor1, codec1->MakeDecompressor());
ASSERT_OK_AND_ASSIGN(auto decompressor2, codec2->MakeDecompressor());

std::vector<uint8_t> decompressed(data.size());
// Using a windowLog greater than ZSTD_WINDOWLOG_LIMIT_DEFAULT(1 << 27) at compression
// stage requires explicitly allowing such size at streaming decompression stage.
auto ret = decompressor1->Decompress(compressed2.size(), compressed2.data(),
decompressed.size(), decompressed.data());
ASSERT_NOT_OK(ret);
ASSERT_EQ(ret.status().message(),
"ZSTD decompress failed: Frame requires too much memory for decoding");

int64_t bytes_written = 0;
int64_t bytes_read = 0;
while (true) {
ASSERT_OK_AND_ASSIGN(auto result,
decompressor2->Decompress(compressed2.size() - bytes_read,
compressed2.data() + bytes_read,
decompressed.size() - bytes_written,
decompressed.data() + bytes_written));
bytes_read += result.bytes_read;
bytes_written += result.bytes_written;
if (!result.need_more_output) {
break;
}
}
ASSERT_TRUE(decompressor2->IsFinished());
ASSERT_EQ(bytes_read, compressed2.size());
ASSERT_EQ(bytes_written, data.size());
ASSERT_EQ(decompressed, data);
}

TEST_P(CodecTest, MinMaxCompressionLevel) {
Expand Down
Loading
Loading