From f47aca291c0f02682f667f508ea3e83378cd7188 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Mon, 11 Aug 2025 21:28:42 +0000 Subject: [PATCH 1/8] noop: add pq compression-codec with no-op implementation --- .../logstash/ackedqueue/CompressionCodec.java | 24 ++++++++ .../java/org/logstash/ackedqueue/Queue.java | 11 +++- .../org/logstash/ackedqueue/Settings.java | 4 ++ .../org/logstash/ackedqueue/SettingsImpl.java | 17 ++++++ .../ackedqueue/CompressionCodecTest.java | 58 +++++++++++++++++++ 5 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java create mode 100644 logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java new file mode 100644 index 0000000000..47eff479ba --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java @@ -0,0 +1,24 @@ +package org.logstash.ackedqueue; + +public interface CompressionCodec { + + byte[] encode(byte[] data); + byte[] decode(byte[] data); + + /** + * The {@link CompressionCodec#NOOP} is a {@link CompressionCodec} that + * does nothing when encoding and decoding. It is only meant to be activated + * as a safety-latch in the event of compression being broken. + */ + CompressionCodec NOOP = new CompressionCodec() { + @Override + public byte[] encode(byte[] data) { + return data; + } + + @Override + public byte[] decode(byte[] data) { + return data; + } + }; +} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 691987793c..de76e412b4 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -83,6 +83,7 @@ public final class Queue implements Closeable { // deserialization private final Class elementClass; private final Method deserializeMethod; + private final CompressionCodec compressionCodec; // thread safety private final ReentrantLock lock = new ReentrantLock(); @@ -112,6 +113,7 @@ public Queue(Settings settings) { this.maxBytes = settings.getQueueMaxBytes(); this.checkpointIO = new FileCheckpointIO(dirPath, settings.getCheckpointRetry()); this.elementClass = settings.getElementClass(); + this.compressionCodec = settings.getCompressionCodec(); this.tailPages = new ArrayList<>(); this.unreadTailPages = new ArrayList<>(); this.closed = new AtomicBoolean(true); // not yet opened @@ -414,7 +416,10 @@ public long write(Queueable element) throws IOException { throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_WRITE_TO_CLOSED_QUEUE); } - byte[] data = element.serialize(); + byte[] serializedBytes = element.serialize(); + byte[] data = compressionCodec.encode(serializedBytes); + + logger.trace("serialized: {}->{}", serializedBytes.length, data.length); // the write strategy with regard to the isFull() state is to assume there is space for this element // and write it, then after write verify if we just filled the queue and wait on the notFull condition @@ -767,7 +772,9 @@ public CheckpointIO getCheckpointIO() { */ public Queueable deserialize(byte[] bytes) { try { - return (Queueable)this.deserializeMethod.invoke(this.elementClass, bytes); + byte[] decodedBytes = compressionCodec.decode(bytes); + logger.trace("deserialized: {}->{}", bytes.length, decodedBytes.length); + return (Queueable)this.deserializeMethod.invoke(this.elementClass, decodedBytes); } catch (IllegalAccessException|InvocationTargetException e) { throw new QueueRuntimeException("deserialize invocation error", e); } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java index 1623738659..9c404ef1fc 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java @@ -44,6 +44,8 @@ public interface Settings { boolean getCheckpointRetry(); + CompressionCodec getCompressionCodec(); + /** * Validate and return the settings, or throw descriptive {@link QueueRuntimeException} * @param settings the settings to validate @@ -89,6 +91,8 @@ interface Builder { Builder checkpointRetry(boolean checkpointRetry); + Builder compressionCodec(CompressionCodec compressionCodec); + Settings build(); } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java index bc191f44a3..923217af36 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java @@ -31,6 +31,7 @@ public class SettingsImpl implements Settings { private final int checkpointMaxAcks; private final int checkpointMaxWrites; private final boolean checkpointRetry; + private final CompressionCodec compressionCodec; public static Builder builder(final Settings settings) { return new BuilderImpl(settings); @@ -49,6 +50,7 @@ private SettingsImpl(final BuilderImpl builder) { this.checkpointMaxAcks = builder.checkpointMaxAcks; this.checkpointMaxWrites = builder.checkpointMaxWrites; this.checkpointRetry = builder.checkpointRetry; + this.compressionCodec = builder.compressionCodec; } @Override @@ -91,6 +93,11 @@ public boolean getCheckpointRetry() { return this.checkpointRetry; } + @Override + public CompressionCodec getCompressionCodec() { + return this.compressionCodec; + } + /** * Default implementation for Setting's Builder * */ @@ -140,6 +147,8 @@ private static final class BuilderImpl implements Builder { private boolean checkpointRetry; + private CompressionCodec compressionCodec; + private BuilderImpl(final String dirForFiles) { this.dirForFiles = dirForFiles; this.elementClass = null; @@ -148,6 +157,7 @@ private BuilderImpl(final String dirForFiles) { this.maxUnread = DEFAULT_MAX_UNREAD; this.checkpointMaxAcks = DEFAULT_CHECKPOINT_MAX_ACKS; this.checkpointMaxWrites = DEFAULT_CHECKPOINT_MAX_WRITES; + this.compressionCodec = CompressionCodec.NOOP; this.checkpointRetry = false; } @@ -160,6 +170,7 @@ private BuilderImpl(final Settings settings) { this.checkpointMaxAcks = settings.getCheckpointMaxAcks(); this.checkpointMaxWrites = settings.getCheckpointMaxWrites(); this.checkpointRetry = settings.getCheckpointRetry(); + this.compressionCodec = settings.getCompressionCodec(); } @Override @@ -204,6 +215,12 @@ public Builder checkpointRetry(final boolean checkpointRetry) { return this; } + @Override + public Builder compressionCodec(CompressionCodec compressionCodec) { + this.compressionCodec = compressionCodec; + return this; + } + @Override public Settings build() { return Settings.ensureValid(new SettingsImpl(this)); diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java new file mode 100644 index 0000000000..0a79a0a363 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java @@ -0,0 +1,58 @@ +package org.logstash.ackedqueue; + +import org.apache.logging.log4j.Logger; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Set; +import java.util.zip.Deflater; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; +import static org.mockito.Matchers.argThat; + +public class CompressionCodecTest { + public static final byte[] RAW_BYTES = ( + "this is a string of text with repeated substrings that is designed to be "+ + "able to be compressed into a string that is smaller than the original input "+ + "so that we can assert that the compression codecs compress strings to be "+ + "smaller than their uncompressed representations").getBytes(); + public static final byte[] DEFLATE_SPEED_BYTES = deflate(RAW_BYTES, Deflater.BEST_SPEED); + public static final byte[] DEFLATE_BALANCED_BYTES = deflate(RAW_BYTES, Deflater.DEFAULT_COMPRESSION); + public static final byte[] DEFLATE_SIZE_BYTES = deflate(RAW_BYTES, Deflater.BEST_COMPRESSION); + + @Test + public void testDisabledCompressionCodecDecodes() throws Exception { + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("disabled"); + assertDecodesRaw(compressionCodec); + + // ensure true pass-through when compression is disabled, even if the payload looks like DEFLATE + assertThat(compressionCodec.decode(DEFLATE_SPEED_BYTES), is(equalTo(DEFLATE_SPEED_BYTES))); + assertThat(compressionCodec.decode(DEFLATE_BALANCED_BYTES), is(equalTo(DEFLATE_BALANCED_BYTES))); + assertThat(compressionCodec.decode(DEFLATE_SIZE_BYTES), is(equalTo(DEFLATE_SIZE_BYTES))); + } + + @Test + public void testDisabledCompressionCodecEncodes() throws Exception { + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("disabled"); + // ensure true pass-through when compression is disabled + assertThat(compressionCodec.encode(RAW_BYTES), is(equalTo(RAW_BYTES))); + } + + public static byte[] deflate(byte[] input, int level) { + final Deflater deflater = new Deflater(level); + try { + deflater.setInput(input); + deflater.finish(); + + // output SHOULD be smaller, but will never be 1kb bigger + byte[] output = new byte[input.length+1024]; + + int compressedLength = deflater.deflate(output); + return Arrays.copyOf(output, compressedLength); + } finally { + deflater.end(); + } + } +} \ No newline at end of file From 39c0b5ced85f4d22d828b274b554eae0719e057c Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Fri, 8 Aug 2025 19:31:52 +0000 Subject: [PATCH 2/8] pq: add support for event compression using zstd Adds non-breaking support for event compression to the persisted queue, as configured by a new per-pipeline setting `queue.compression`, which supports: - `none` (default): no compression is performed, but if compressed events are encountered in the queue they will be decompressed - `speed`: compression optimized for speed - `balanced`: compression balancing speed against result size - `size`: compression optimized for maximum reduction of size - `disabled`: compression support entirely disabled; if a pipeline is run in this configuration against a PQ that already contains unacked compressed events, the pipeline WILL crash. To accomplish this, we then provide an abstract base implementation of the CompressionCodec whose decode method is capable of _detecting_ and decoding zstd-encoded payload while letting other payloads through unmodified. The detection is done with an operation on the first four bytes of the payload, so no additional context is needed. An instance of this zstd-aware compression codec is provided with a pass-through encode operation when configured with `queue.compression: none`, which is the default, ensuring that by default logstash is able to decode any event that had previously been written. We provide an additional implementation that is capable of _encoding_ events with a configurable goal: speed, size, or a balance of the two. --- config/logstash.yml | 4 + docker/data/logstash/env2yaml/env2yaml.go | 1 + docs/reference/logstash-settings-file.md | 1 + docs/reference/persistent-queues.md | 6 + logstash-core/build.gradle | 1 + logstash-core/lib/logstash/environment.rb | 1 + logstash-core/lib/logstash/settings.rb | 1 + .../spec/logstash/queue_factory_spec.rb | 1 + .../AbstractZstdAwareCompressionCodec.java | 42 ++++ .../logstash/ackedqueue/CompressionCodec.java | 34 +++ .../logstash/ackedqueue/QueueFactoryExt.java | 12 + .../ackedqueue/ZstdAwareCompressionCodec.java | 18 ++ .../ZstdEnabledCompressionCodec.java | 39 +++ .../common/SettingKeyDefinitions.java | 2 + .../org/logstash/util/CleanerThreadLocal.java | 115 +++++++++ .../ackedqueue/CompressionCodecTest.java | 227 ++++++++++++++++-- .../ackedqueue/ImmutableByteArrayBarrier.java | 20 ++ .../logstash/util/CleanerThreadLocalTest.java | 159 ++++++++++++ .../mixed-compression-queue-data-dir.md | 141 +++++++++++ .../mixed-compression-queue-data-dir.tar.gz | Bin 0 -> 9467 bytes qa/integration/fixtures/pq_drain_spec.yml | 3 + qa/integration/specs/pq_drain_spec.rb | 142 +++++++++++ 22 files changed, 947 insertions(+), 23 deletions(-) create mode 100644 logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java create mode 100644 logstash-core/src/main/java/org/logstash/ackedqueue/ZstdAwareCompressionCodec.java create mode 100644 logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java create mode 100644 logstash-core/src/main/java/org/logstash/util/CleanerThreadLocal.java create mode 100644 logstash-core/src/test/java/org/logstash/ackedqueue/ImmutableByteArrayBarrier.java create mode 100644 logstash-core/src/test/java/org/logstash/util/CleanerThreadLocalTest.java create mode 100644 qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.md create mode 100644 qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.tar.gz create mode 100644 qa/integration/fixtures/pq_drain_spec.yml create mode 100644 qa/integration/specs/pq_drain_spec.rb diff --git a/config/logstash.yml b/config/logstash.yml index e4d008ca5c..4a5be0cb23 100644 --- a/config/logstash.yml +++ b/config/logstash.yml @@ -223,6 +223,10 @@ # # queue.checkpoint.writes: 1024 # +# If using queue.type: persisted, the compression goal. Valid values are `none`, `speed`, `balanced`, and `size`. +# The default `none` is able to decompress previously-written events, even if they were compressed. +# +# queue.compression: none # # ------------ Dead-Letter Queue Settings -------------- # Flag to turn on dead-letter queue. diff --git a/docker/data/logstash/env2yaml/env2yaml.go b/docker/data/logstash/env2yaml/env2yaml.go index 95fc569b23..d1e976bbab 100644 --- a/docker/data/logstash/env2yaml/env2yaml.go +++ b/docker/data/logstash/env2yaml/env2yaml.go @@ -58,6 +58,7 @@ var validSettings = []string{ "queue.checkpoint.acks", "queue.checkpoint.writes", "queue.checkpoint.interval", // remove it for #17155 + "queue.compression", "queue.drain", "dead_letter_queue.enable", "dead_letter_queue.max_bytes", diff --git a/docs/reference/logstash-settings-file.md b/docs/reference/logstash-settings-file.md index 7bcce3c853..b14ac26f13 100644 --- a/docs/reference/logstash-settings-file.md +++ b/docs/reference/logstash-settings-file.md @@ -68,6 +68,7 @@ The `logstash.yml` file includes these settings. | `queue.checkpoint.acks` | The maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.acks: 0` to set this value to unlimited. | 1024 | | `queue.checkpoint.writes` | The maximum number of written events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.writes: 0` to set this value to unlimited. | 1024 | | `queue.checkpoint.retry` | When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on Windows platform, filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances. (`queue.type: persisted`) | `true` | +| `queue.compression` | Set a persisted queue compression goal, which allows the pipeline to spend CPU to reduce the serialized size on disk. Acceptable values are `speed`, `balanced`, and `size`. | `none` | | `queue.drain` | When enabled, Logstash waits until the persistent queue (`queue.type: persisted`) is drained before shutting down. | `false` | | `dead_letter_queue.enable` | Flag to instruct Logstash to enable the DLQ feature supported by plugins. | `false` | | `dead_letter_queue.max_bytes` | The maximum size of each dead letter queue. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting. | `1024mb` | diff --git a/docs/reference/persistent-queues.md b/docs/reference/persistent-queues.md index 8c2bed6639..7694b471fd 100644 --- a/docs/reference/persistent-queues.md +++ b/docs/reference/persistent-queues.md @@ -84,6 +84,12 @@ If you want to define values for a specific pipeline, use [`pipelines.yml`](/ref `queue.checkpoint.interval` {applies_to}`stack: deprecated 9.1` : Sets the interval in milliseconds when a checkpoint is forced on the head page. Default is `1000`. Set to `0` to eliminate periodic checkpoints. +`queue.compression` {applies_to}`stack: ga 9.2` +: Sets the event compression goal for use with the persisted queue. Default is `none`. Acceptable values include: + * `speed`: optimize for fastest compression operation + * `size`: optimize for smallest possible size on disk, spending more CPU + * `balanced`: a balance between the `speed` and `size` settings + ## Configuration notes [pq-config-notes] Every situation and environment is different, and the "ideal" configuration varies. If you optimize for performance, you may increase your risk of losing data. If you optimize for data protection, you may impact performance. diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle index 5c0db0f5a3..a45e168f00 100644 --- a/logstash-core/build.gradle +++ b/logstash-core/build.gradle @@ -239,6 +239,7 @@ dependencies { implementation 'commons-codec:commons-codec:1.17.0' // transitively required by httpclient // Jackson version moved to versions.yml in the project root (the JrJackson version is there too) implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}" + implementation "com.github.luben:zstd-jni:1.5.7-4" api "com.fasterxml.jackson.core:jackson-databind:${jacksonDatabindVersion}" api "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}" implementation 'org.codehaus.janino:janino:3.1.0' diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 49978fad31..2874c3d6e7 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -95,6 +95,7 @@ def self.as_java_range(r) Setting::NumericSetting.new("queue.checkpoint.writes", 1024), # 0 is unlimited Setting::NumericSetting.new("queue.checkpoint.interval", 1000), # remove it for #17155 Setting::BooleanSetting.new("queue.checkpoint.retry", true), + Setting::StringSetting.new("queue.compression", "none", true, %w(none speed balanced size disabled)), Setting::BooleanSetting.new("dead_letter_queue.enable", false), Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"), Setting::NumericSetting.new("dead_letter_queue.flush_interval", 5000), diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb index e90d552331..78b63810a7 100644 --- a/logstash-core/lib/logstash/settings.rb +++ b/logstash-core/lib/logstash/settings.rb @@ -69,6 +69,7 @@ def self.included(base) "queue.checkpoint.interval", # remove it for #17155 "queue.checkpoint.writes", "queue.checkpoint.retry", + "queue.compression", "queue.drain", "queue.max_bytes", "queue.max_events", diff --git a/logstash-core/spec/logstash/queue_factory_spec.rb b/logstash-core/spec/logstash/queue_factory_spec.rb index 540113412c..5d8dd24a22 100644 --- a/logstash-core/spec/logstash/queue_factory_spec.rb +++ b/logstash-core/spec/logstash/queue_factory_spec.rb @@ -30,6 +30,7 @@ LogStash::Setting::NumericSetting.new("queue.checkpoint.acks", 1024), LogStash::Setting::NumericSetting.new("queue.checkpoint.writes", 1024), LogStash::Setting::BooleanSetting.new("queue.checkpoint.retry", false), + LogStash::Setting::StringSetting.new("queue.compression", "none", true, %w(none speed balanced size disabled)), LogStash::Setting::StringSetting.new("pipeline.id", pipeline_id), LogStash::Setting::PositiveIntegerSetting.new("pipeline.batch.size", 125), LogStash::Setting::PositiveIntegerSetting.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum) diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java new file mode 100644 index 0000000000..378f0fd06f --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java @@ -0,0 +1,42 @@ +package org.logstash.ackedqueue; + +import com.github.luben.zstd.Zstd; +import org.logstash.util.CleanerThreadLocal; +import org.logstash.util.SetOnceReference; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.ref.Cleaner; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +/** + * Subclasses of {@link AbstractZstdAwareCompressionCodec} are {@link CompressionCodec}s that are capable + * of detecting and decompressing deflate-compressed events. When decoding byte sequences that are NOT + * deflate-compressed, the given bytes are emitted verbatim. + */ +abstract class AbstractZstdAwareCompressionCodec implements CompressionCodec { + @Override + public byte[] decode(byte[] data) { + if (!isZstd(data)) { + return data; + } + try { + return Zstd.decompress(data); + } catch (Exception e) { + throw new RuntimeException("Exception while decoding", e); + } + } + + private static final byte[] ZSTD_FRAME_MAGIC = { (byte) 0x28, (byte) 0xB5, (byte) 0x2F, (byte) 0xFD }; + + static boolean isZstd(byte[] data) { + if (data.length < 4) { return false; } + + for (int i = 0; i < 4; i++) { + if (data[i] != ZSTD_FRAME_MAGIC[i]) { return false; } + } + + return true; + } +} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java index 47eff479ba..b1f99cf998 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java @@ -1,6 +1,10 @@ package org.logstash.ackedqueue; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + public interface CompressionCodec { + Logger LOGGER = LogManager.getLogger(CompressionCodec.class); byte[] encode(byte[] data); byte[] decode(byte[] data); @@ -21,4 +25,34 @@ public byte[] decode(byte[] data) { return data; } }; + + static CompressionCodec fromConfigValue(final String configValue) { + return fromConfigValue(configValue, LOGGER); + } + + static CompressionCodec fromConfigValue(final String configValue, final Logger logger) { + return switch (configValue) { + case "disabled" -> { + logger.warn("compression support has been disabled"); + yield CompressionCodec.NOOP; + } + case "none" -> { + logger.info("compression support is enabled (read-only)"); + yield ZstdAwareCompressionCodec.getInstance(); + } + case "speed" -> { + logger.info("compression support is enabled (goal: speed)"); + yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SPEED); + } + case "balanced" -> { + logger.info("compression support is enabled (goal: balanced)"); + yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.BALANCED); + } + case "size" -> { + logger.info("compression support is enabled (goal: size)"); + yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SIZE); + } + default -> throw new IllegalArgumentException(String.format("Unsupported compression setting `%s`", configValue)); + }; + } } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java index 6a10c2a3e7..f437c1898b 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java @@ -24,6 +24,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.jruby.Ruby; import org.jruby.RubyBasicObject; import org.jruby.RubyClass; @@ -63,6 +66,8 @@ public final class QueueFactoryExt extends RubyBasicObject { private static final long serialVersionUID = 1L; + private static final Logger LOGGER = LogManager.getLogger(QueueFactoryExt.class); + public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); } @@ -123,6 +128,13 @@ private static Settings extractQueueSettings(final IRubyObject settings) { .checkpointMaxAcks(getSetting(context, settings, QUEUE_CHECKPOINT_ACKS).toJava(Integer.class)) .checkpointRetry(getSetting(context, settings, QUEUE_CHECKPOINT_RETRY).isTrue()) .queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Integer.class)) + .compressionCodec(extractConfiguredCodec(settings)) .build(); } + + private static CompressionCodec extractConfiguredCodec(final IRubyObject settings) { + final ThreadContext context = settings.getRuntime().getCurrentContext(); + final String compressionSetting = getSetting(context, settings, QUEUE_COMPRESSION).asJavaString(); + return CompressionCodec.fromConfigValue(compressionSetting, LOGGER); + } } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdAwareCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdAwareCompressionCodec.java new file mode 100644 index 0000000000..f82b4b75f2 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdAwareCompressionCodec.java @@ -0,0 +1,18 @@ +package org.logstash.ackedqueue; + +/** + * A {@link ZstdAwareCompressionCodec} is an {@link CompressionCodec} that can decode deflate-compressed + * bytes, but performs no compression when encoding. + */ +class ZstdAwareCompressionCodec extends AbstractZstdAwareCompressionCodec { + private static final ZstdAwareCompressionCodec INSTANCE = new ZstdAwareCompressionCodec(); + + static ZstdAwareCompressionCodec getInstance() { + return INSTANCE; + } + + @Override + public byte[] encode(byte[] data) { + return data; + } +} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java new file mode 100644 index 0000000000..01fb55673a --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java @@ -0,0 +1,39 @@ +package org.logstash.ackedqueue; + +import com.github.luben.zstd.Zstd; + +/** + * A {@link ZstdEnabledCompressionCodec} is a {@link CompressionCodec} that can decode deflate-compressed + * bytes and performs deflate compression when encoding. + */ +class ZstdEnabledCompressionCodec extends AbstractZstdAwareCompressionCodec implements CompressionCodec { + public enum Goal { + FASTEST(-7), + SPEED(-1), + BALANCED(3), + HIGH(14), + SIZE(22), + ; + + private int internalLevel; + + Goal(final int internalLevel) { + this.internalLevel = internalLevel; + } + } + + private final int internalLevel; + + ZstdEnabledCompressionCodec(final Goal internalLevel) { + this.internalLevel = internalLevel.internalLevel; + } + + @Override + public byte[] encode(byte[] data) { + try { + return Zstd.compress(data, internalLevel); + } catch (Exception e) { + throw new RuntimeException("Exception while encoding", e); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java b/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java index 7c5c47e316..be113bd0a2 100644 --- a/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java +++ b/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java @@ -43,4 +43,6 @@ public class SettingKeyDefinitions { public static final String QUEUE_CHECKPOINT_RETRY = "queue.checkpoint.retry"; public static final String QUEUE_MAX_BYTES = "queue.max_bytes"; + + public static final String QUEUE_COMPRESSION = "queue.compression"; } \ No newline at end of file diff --git a/logstash-core/src/main/java/org/logstash/util/CleanerThreadLocal.java b/logstash-core/src/main/java/org/logstash/util/CleanerThreadLocal.java new file mode 100644 index 0000000000..83f6d1289e --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/util/CleanerThreadLocal.java @@ -0,0 +1,115 @@ +package org.logstash.util; + +import java.lang.ref.Cleaner; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * A {@link CleanerThreadLocal} is semantically the same as a {@link ThreadLocal}, except that a clean action + * is called against each object when the {@link CleanerThreadLocal} no longer holds a reference to it. + * @param the external value type + */ +public final class CleanerThreadLocal { + private final ThreadLocal threadLocal; + private final Consumer cleanAction; + private final Cleaner cleaner; + + private CleanerThreadLocal(final Cleaner cleaner, + final Supplier factory, + final Consumer cleanAction) { + this.cleaner = cleaner; + this.cleanAction = cleanAction; + this.threadLocal = ThreadLocal.withInitial(() -> wrap(factory.get())); + } + + public T get() { + return unwrap(threadLocal.get()); + } + + public void set(final T value) { + threadLocal.set(wrap(value)); + } + + public void remove() { + threadLocal.remove(); + } + + private T unwrap(final CleanableHolder holder) { + if (holder == null) { + return null; + } + return holder.getDelegate(); + } + + private CleanableHolder wrap(final T value) { + if (value == null) { + return null; + } + return new CleanableHolder(value, this.cleanAction); + } + + public static Factory withInitial(final Supplier initialValueSupplier) { + return new Factory<>(initialValueSupplier); + } + + public static CleanerThreadLocal withCleanAction(final Consumer cleanAction) { + return new Factory().withCleanAction(cleanAction); + } + + public static CleanerThreadLocal withCleanAction(final Consumer cleanAction, final Cleaner cleaner) { + return new Factory().withCleanAction(cleanAction, cleaner); + } + + public static class Factory { + private final Supplier initialValueSupplier; + + private Factory() { + this.initialValueSupplier = () -> null; + } + + private Factory(final Supplier initialValueSupplier) { + this.initialValueSupplier = Objects.requireNonNull(initialValueSupplier); + } + + public CleanerThreadLocal withCleanAction(final Consumer cleanAction) { + return withCleanAction(cleanAction, Cleaner.create()); + } + + public CleanerThreadLocal withCleanAction(final Consumer cleanAction, final Cleaner cleaner) { + return new CleanerThreadLocal<>(Objects.requireNonNullElseGet(cleaner, Cleaner::create), initialValueSupplier, cleanAction); + } + } + + private class CleanableHolder { + private final T delegate; + + static class CleaningAction implements Runnable { + private final T delegate; + private final Consumer cleanAction; + + CleaningAction(final T delegate, + final Consumer cleanAction) { + this.delegate = delegate; + this.cleanAction = cleanAction; + } + + @Override + public void run() { + if (delegate != null) { + cleanAction.accept(delegate); + } + } + } + + private CleanableHolder(final T delegate, + final Consumer finalizer) { + this.delegate = delegate; + cleaner.register(this, new CleaningAction(delegate, finalizer)); + } + + private T getDelegate() { + return delegate; + } + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java index 0a79a0a363..d1a154bd3d 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java @@ -1,58 +1,239 @@ package org.logstash.ackedqueue; +import com.github.luben.zstd.Zstd; import org.apache.logging.log4j.Logger; import org.junit.Test; import org.mockito.Mockito; +import java.security.SecureRandom; import java.util.Arrays; -import java.util.Set; -import java.util.zip.Deflater; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; +import static org.junit.Assert.assertThrows; import static org.mockito.Matchers.argThat; public class CompressionCodecTest { - public static final byte[] RAW_BYTES = ( + static final ImmutableByteArrayBarrier RAW_BYTES = new ImmutableByteArrayBarrier(( "this is a string of text with repeated substrings that is designed to be "+ "able to be compressed into a string that is smaller than the original input "+ "so that we can assert that the compression codecs compress strings to be "+ - "smaller than their uncompressed representations").getBytes(); - public static final byte[] DEFLATE_SPEED_BYTES = deflate(RAW_BYTES, Deflater.BEST_SPEED); - public static final byte[] DEFLATE_BALANCED_BYTES = deflate(RAW_BYTES, Deflater.DEFAULT_COMPRESSION); - public static final byte[] DEFLATE_SIZE_BYTES = deflate(RAW_BYTES, Deflater.BEST_COMPRESSION); + "smaller than their uncompressed representations").getBytes()); + static final ImmutableByteArrayBarrier COMPRESSED_MINIMAL = new ImmutableByteArrayBarrier(compress(RAW_BYTES.bytes(), -1)); + static final ImmutableByteArrayBarrier COMPRESSED_DEFAULT = new ImmutableByteArrayBarrier(compress(RAW_BYTES.bytes(), 3)); + static final ImmutableByteArrayBarrier COMPRESSED_MAXIMUM = new ImmutableByteArrayBarrier(compress(RAW_BYTES.bytes(), 22)); + + private final CompressionCodec codecDisabled = CompressionCodec.fromConfigValue("disabled"); + private final CompressionCodec codecNone = CompressionCodec.fromConfigValue("none"); + private final CompressionCodec codecSpeed = CompressionCodec.fromConfigValue("speed"); + private final CompressionCodec codecBalanced = CompressionCodec.fromConfigValue("balanced"); + private final CompressionCodec codecSize = CompressionCodec.fromConfigValue("size"); @Test public void testDisabledCompressionCodecDecodes() throws Exception { final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("disabled"); assertDecodesRaw(compressionCodec); - // ensure true pass-through when compression is disabled, even if the payload looks like DEFLATE - assertThat(compressionCodec.decode(DEFLATE_SPEED_BYTES), is(equalTo(DEFLATE_SPEED_BYTES))); - assertThat(compressionCodec.decode(DEFLATE_BALANCED_BYTES), is(equalTo(DEFLATE_BALANCED_BYTES))); - assertThat(compressionCodec.decode(DEFLATE_SIZE_BYTES), is(equalTo(DEFLATE_SIZE_BYTES))); + // ensure true pass-through when compression is disabled, even if the payload looks like ZSTD + assertThat(compressionCodec.decode(COMPRESSED_MINIMAL.bytes()), is(equalTo(COMPRESSED_MINIMAL.bytes()))); + assertThat(compressionCodec.decode(COMPRESSED_DEFAULT.bytes()), is(equalTo(COMPRESSED_DEFAULT.bytes()))); + assertThat(compressionCodec.decode(COMPRESSED_MAXIMUM.bytes()), is(equalTo(COMPRESSED_MAXIMUM.bytes()))); } @Test public void testDisabledCompressionCodecEncodes() throws Exception { final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("disabled"); // ensure true pass-through when compression is disabled - assertThat(compressionCodec.encode(RAW_BYTES), is(equalTo(RAW_BYTES))); + assertThat(compressionCodec.encode(RAW_BYTES.bytes()), is(equalTo(RAW_BYTES.bytes()))); + } + + @Test + public void testDisabledCompressionCodecLogging() throws Exception { + final Logger mockLogger = Mockito.mock(Logger.class); + CompressionCodec.fromConfigValue("disabled", mockLogger); + Mockito.verify(mockLogger).warn(argThat(stringContainsInOrder("compression support", "disabled"))); + } + + @Test + public void testNoneCompressionCodecDecodes() throws Exception { + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("none"); + assertDecodesRaw(compressionCodec); + assertDecodesDeflateAnyLevel(compressionCodec); + assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec); } - public static byte[] deflate(byte[] input, int level) { - final Deflater deflater = new Deflater(level); - try { - deflater.setInput(input); - deflater.finish(); + @Test + public void testNoneCompressionCodecEncodes() throws Exception { + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("none"); + assertThat(compressionCodec.encode(RAW_BYTES.bytes()), is(equalTo(RAW_BYTES.bytes()))); + } - // output SHOULD be smaller, but will never be 1kb bigger - byte[] output = new byte[input.length+1024]; + @Test + public void testNoneCompressionCodecLogging() throws Exception { + final Logger mockLogger = Mockito.mock(Logger.class); + CompressionCodec.fromConfigValue("none", mockLogger); + Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "read-only"))); + } + + @Test + public void testSpeedCompressionCodecDecodes() throws Exception { + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("speed"); + assertDecodesRaw(compressionCodec); + assertDecodesDeflateAnyLevel(compressionCodec); + assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec); + } + + @Test + public void testSpeedCompressionCodecEncodes() throws Exception { + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("speed"); + assertEncodesSmallerRoundTrip(compressionCodec); + } - int compressedLength = deflater.deflate(output); - return Arrays.copyOf(output, compressedLength); - } finally { - deflater.end(); + @Test + public void testSpeedCompressionCodecLogging() throws Exception { + final Logger mockLogger = Mockito.mock(Logger.class); + CompressionCodec.fromConfigValue("speed", mockLogger); + Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "speed"))); + } + + @Test + public void testBalancedCompressionCodecDecodes() throws Exception { + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("balanced"); + assertDecodesRaw(compressionCodec); + assertDecodesDeflateAnyLevel(compressionCodec); + assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec); + } + + @Test + public void testBalancedCompressionCodecEncodes() throws Exception { + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("balanced"); + assertEncodesSmallerRoundTrip(compressionCodec); + } + + @Test + public void testBalancedCompressionCodecLogging() throws Exception { + final Logger mockLogger = Mockito.mock(Logger.class); + CompressionCodec.fromConfigValue("balanced", mockLogger); + Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "balanced"))); + } + + @Test + public void testSizeCompressionCodecDecodes() throws Exception { + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("size"); + assertDecodesRaw(compressionCodec); + assertDecodesDeflateAnyLevel(compressionCodec); + assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec); + } + + @Test + public void testSizeCompressionCodecEncodes() throws Exception { + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("size"); + assertEncodesSmallerRoundTrip(compressionCodec); + } + + @Test + public void testSizeCompressionCodecLogging() throws Exception { + final Logger mockLogger = Mockito.mock(Logger.class); + CompressionCodec.fromConfigValue("size", mockLogger); + Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "size"))); + } + + @Test(timeout=1000) + public void testCompressionCodecDecodeTailTruncated() throws Exception { + final byte[] truncatedInput = copyWithTruncatedTail(COMPRESSED_DEFAULT.bytes(), 32); + + final RuntimeException thrownException = assertThrows(RuntimeException.class, () -> codecNone.decode(truncatedInput)); + assertThat(thrownException.getMessage(), containsString("Exception while decoding")); + final Throwable rootCause = extractRootCause(thrownException); + assertThat(rootCause.getMessage(), containsString("Data corruption detected")); + } + + byte[] copyWithTruncatedTail(final byte[] input, final int tailSize) { + int startIndex = (input.length < tailSize) ? 0 : input.length - tailSize; + + final byte[] result = Arrays.copyOf(input, input.length); + Arrays.fill(result, startIndex, result.length, (byte) 0); + + return result; + } + + @Test(timeout=1000) + public void testCompressionCodecDecodeTailScrambled() throws Exception { + final byte[] scrambledInput = copyWithScrambledTail(COMPRESSED_DEFAULT.bytes(), 32); + + final RuntimeException thrownException = assertThrows(RuntimeException.class, () -> codecNone.decode(scrambledInput)); + assertThat(thrownException.getMessage(), containsString("Exception while decoding")); + final Throwable rootCause = extractRootCause(thrownException); + assertThat(rootCause.getMessage(), anyOf(containsString("Data corruption detected"), containsString("Destination buffer is too small"))); + } + + byte[] copyWithScrambledTail(final byte[] input, final int tailSize) { + final SecureRandom secureRandom = new SecureRandom(); + int startIndex = (input.length < tailSize) ? 0 : input.length - tailSize; + + byte[] randomBytes = new byte[input.length - startIndex]; + secureRandom.nextBytes(randomBytes); + + final byte[] result = Arrays.copyOf(input, input.length); + System.arraycopy(randomBytes, 0, result, startIndex, randomBytes.length); + + return result; + } + + @Test(timeout=1000) + public void testCompressionDecodeTailNullPadded() throws Exception { + final byte[] nullPaddedInput = copyWithNullPaddedTail(COMPRESSED_DEFAULT.bytes(), 32); + + final RuntimeException thrownException = assertThrows(RuntimeException.class, () -> codecNone.decode(nullPaddedInput)); + assertThat(thrownException.getMessage(), containsString("Exception while decoding")); + final Throwable rootCause = extractRootCause(thrownException); + assertThat(rootCause.getMessage(), anyOf(containsString("Unknown frame descriptor"), containsString("Data corruption detected"))); + } + + byte[] copyWithNullPaddedTail(final byte[] input, final int tailSize) { + return Arrays.copyOf(input, Math.addExact(input.length, tailSize)); + } + + Throwable extractRootCause(final Throwable throwable) { + Throwable current; + Throwable cause = throwable; + do { + current = cause; + cause = current.getCause(); + } while (cause != null && cause != current); + return current; + } + + void assertDecodesRaw(final CompressionCodec codec) { + assertThat(codec.decode(RAW_BYTES.bytes()), is(equalTo(RAW_BYTES.bytes()))); + } + + void assertDecodesDeflateAnyLevel(final CompressionCodec codec) { + // zstd levels range from -7 to 22. + for (int level = -7; level < 22; level++) { + final byte[] compressed = compress(RAW_BYTES.bytes(), level); + assertThat(String.format("zstd level %s (%s bytes)", level, compressed.length), codec.decode(compressed), is(equalTo(RAW_BYTES.bytes()))); } } + + void assertDecodesOutputOfAllKnownCompressionCodecs(final CompressionCodec codec) { + assertThat(codec.decode(codecDisabled.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes()))); + assertThat(codec.decode(codecNone.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes()))); + assertThat(codec.decode(codecSpeed.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes()))); + assertThat(codec.decode(codecBalanced.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes()))); + assertThat(codec.decode(codecSize.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes()))); + } + + void assertEncodesSmallerRoundTrip(final CompressionCodec codec) { + final byte[] input = RAW_BYTES.bytes(); + + final byte[] encoded = codec.encode(input); + assertThat("encoded is smaller", encoded.length, is(lessThan(input.length))); + assertThat("shaped like zstd", AbstractZstdAwareCompressionCodec.isZstd(encoded), is(true)); + assertThat("round trip decode", codec.decode(encoded), is(equalTo(input))); + } + + public static byte[] compress(byte[] input, int level) { + return Zstd.compress(input, level); + } } \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/ImmutableByteArrayBarrier.java b/logstash-core/src/test/java/org/logstash/ackedqueue/ImmutableByteArrayBarrier.java new file mode 100644 index 0000000000..276e4d97db --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/ImmutableByteArrayBarrier.java @@ -0,0 +1,20 @@ +package org.logstash.ackedqueue; + +import java.util.Arrays; + +/** + * An {@link ImmutableByteArrayBarrier} provides an immutability shield around a {@code byte[]}. + * It stores an inaccessible copy of the provided {@code byte[]}, and makes copies of that copy + * available via {@link ImmutableByteArrayBarrier#bytes}. + * @param bytes the byte array + */ +public record ImmutableByteArrayBarrier(byte[] bytes) { + public ImmutableByteArrayBarrier(byte[] bytes) { + this.bytes = Arrays.copyOf(bytes, bytes.length); + } + + @Override + public byte[] bytes() { + return Arrays.copyOf(bytes, bytes.length); + } +} diff --git a/logstash-core/src/test/java/org/logstash/util/CleanerThreadLocalTest.java b/logstash-core/src/test/java/org/logstash/util/CleanerThreadLocalTest.java new file mode 100644 index 0000000000..d90937d379 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/util/CleanerThreadLocalTest.java @@ -0,0 +1,159 @@ +package org.logstash.util; + +import org.junit.Test; +import org.hamcrest.FeatureMatcher; +import org.hamcrest.Matcher; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +import static org.junit.Assert.assertTrue; + +public class CleanerThreadLocalTest { + @Test + public void testWithExecutorService() throws Exception { + final int threads = 100; + final int iterations = 10_000; + final ResourceFactory resourceFactory = new ResourceFactory(); + final CleanerThreadLocal threadLocal = CleanerThreadLocal + .withInitial(resourceFactory::create) + .withCleanAction(ResourceFactory.Resource::closeAction); + + assertThat(resourceFactory.getAll(), hasSize(0)); + + final ExecutorService executorService = Executors.newFixedThreadPool(threads); + for (int i = 0; i < iterations; i++) { + executorService.submit(() -> threadLocal.get().incrementAccessCounter()); + } + + // while the threads are alive, we should not have cleaned up + assertThat(resourceFactory.getAll(), allOf( + everyItem(isClosed(is(equalTo(false)))), + everyItem(closeCount(is(equalTo(0)))) + )); + + // shutdown and wait; this should kill the threads and empty the threadlocal + executorService.shutdown(); + assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS)); + + // make sure each of the threads got a resource, and that no resource was accessed cross-threads + assertThat(resourceFactory.getAll(), hasSize(threads)); + assertThat(resourceFactory.getAll(), everyItem(accessThreadOrigins(hasSize(1)))); + + // validate that all iterations completed + Integer totalAccesses = resourceFactory.getAll().stream().map((resource -> resource.accessCounter.get())).reduce(0, Integer::sum); + assertThat(totalAccesses, equalTo(iterations)); + + // ensure that the cleanup actions were executed + bruteForceGC(); + assertThat(resourceFactory.getAll(), allOf( + everyItem(isClosed(is(equalTo(true)))), + everyItem(closeCount(is(equalTo(1)))) + )); + } + + static Matcher isClosed(Matcher closedMatcher) { + return new FeatureMatcher(closedMatcher, "a resource", "closed") { + @Override + protected Boolean featureValueOf(ResourceFactory.Resource resource) { + return resource.isClosed(); + } + }; + } + + + static Matcher closeCount(Matcher closeCountMatcher) { + return new FeatureMatcher(closeCountMatcher, "a resource", "close count") { + @Override + protected Integer featureValueOf(ResourceFactory.Resource resource) { + return resource.closeCounter.get(); + } + }; + } + + static Matcher accessThreadOrigins(Matcher> threadOriginsMatcher) { + return new FeatureMatcher>(threadOriginsMatcher, "a resource", "access thread origins") { + @Override + protected Collection featureValueOf(ResourceFactory.Resource resource) { + return resource.threadIds.get(); + } + }; + } + + private void bruteForceGC() throws Exception { + // technically just a hint to the JVM, + // so we hint many times. + for (int i = 0; i < 100; i++) { + System.gc(); + } + } + + static class ResourceFactory { + private AtomicReference> resources = new AtomicReference<>(Set.of()); + protected AtomicInteger counter = new AtomicInteger(); + + public Resource create() { + final Resource resource = new Resource(); + resources.updateAndGet((existing) -> immutableSetAdd(existing, resource)); + return resource; + } + + public Set getAll() { + return resources.get(); + } + + class Resource { + private final int id = counter.incrementAndGet(); + + private final AtomicInteger accessCounter = new AtomicInteger(); + private final AtomicInteger closeCounter = new AtomicInteger(); + private final AtomicBoolean closed = new AtomicBoolean(); + + private final AtomicReference> threadIds = new AtomicReference<>(Set.of()); + + public int getId() { + return id; + } + + public void incrementAccessCounter() { + if (closed.get()) { + throw new IllegalStateException("closed"); + } + threadIds.updateAndGet((existing) -> immutableSetAdd(existing, getCurrentThreadId())); + accessCounter.incrementAndGet(); + } + + @SuppressWarnings("deprecation") + private static long getCurrentThreadId() { + // [JEP-425](https://openjdk.org/jeps/425) introduced `Thread#threadId()` to replace `Thread#getId()` + // in Java 19 + return Thread.currentThread().getId(); + } + + public void closeAction() { + closeCounter.incrementAndGet(); + closed.set(true); + } + + public boolean isClosed() { + return closed.get(); + } + } + + Set immutableSetAdd(Set existing, T item) { + final Set mutable = new HashSet<>(existing); + mutable.add(item); + return Set.copyOf(mutable); + } + } +} \ No newline at end of file diff --git a/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.md b/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.md new file mode 100644 index 0000000000..6daecabb14 --- /dev/null +++ b/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.md @@ -0,0 +1,141 @@ +# Summary + +The logstash data directory contains a queue for pipeline `main` containing: + + - ACK'd events (from a page that is not fully-ack'd) + - raw CBOR-encoded events + - zstd-compressed events with different compression goals + +# Pages +~~~ +1 258 821AACAC page.0 CBOR(stringref) +2 343 3BE717E8 page.0 CBOR(stringref) +3 332 3439807A page.0 CBOR(stringref) +4 258 C04209D4 page.0 CBOR(stringref) +5 343 3DFB08E8 page.0 CBOR(stringref) +6 332 44B0315D page.0 CBOR(stringref) +7 258 90D25985 page.0 CBOR(stringref) +8 343 DAFD5712 page.0 CBOR(stringref) +9 332 AB6A81DF page.0 CBOR(stringref) +10 258 157EA7A6 page.0 CBOR(stringref) +11 258 02C0F7A2 page.0 CBOR(stringref) +12 343 0005E8A8 page.0 CBOR(stringref) +13 332 C2DA39EA page.1 CBOR(stringref) +14 258 377D623C page.1 CBOR(stringref) +15 343 9F76657C page.1 CBOR(stringref) +16 332 50B51A98 page.1 CBOR(stringref) +17 258 827848CC page.1 CBOR(stringref) +18 343 8325D121 page.1 CBOR(stringref) +19 332 E1A1378B page.1 CBOR(stringref) +20 258 1BBDAA1A page.1 CBOR(stringref) +21 254 19C85DF6 page.1 ZSTD(258) +22 317 AD5DC7CC page.1 ZSTD(343) +23 325 BB8CE48C page.1 ZSTD(332) +24 254 27D38856 page.1 ZSTD(258) +25 317 67A7D2F3 page.2 ZSTD(343) +26 325 888AF9B2 page.2 ZSTD(332) +27 254 CAA2FDE3 page.2 ZSTD(258) +28 317 2985771A page.2 ZSTD(343) +29 325 89197F51 page.2 ZSTD(332) +30 254 A9E292EE page.2 ZSTD(258) +31 258 243FC2C1 page.2 CBOR(stringref) +32 219 2E2E0BDF page.2 ZSTD(258) +33 261 5ED17F40 page.2 ZSTD(343) +34 280 86BA1E80 page.2 ZSTD(332) +35 218 6A7B8C41 page.2 ZSTD(258) +36 262 08E69C4C page.2 ZSTD(343) +37 277 CD32DEBD page.2 ZSTD(332) +38 218 43101D61 page.2 ZSTD(258) +39 261 A22033DE page.3 ZSTD(343) +40 279 8F1FE0FA page.3 ZSTD(332) +41 218 FF56D05C page.3 ZSTD(258) +42 258 7077981D page.3 CBOR(stringref) +43 343 7748A127 page.3 CBOR(stringref) +44 332 B4A0C82C page.3 CBOR(stringref) +45 258 96FB0308 page.3 CBOR(stringref) +46 343 40B77975 page.3 CBOR(stringref) +47 332 D5571FDC page.3 CBOR(stringref) +48 258 BF3FC517 page.3 CBOR(stringref) +49 343 1BC62146 page.3 CBOR(stringref) +50 332 418FD829 page.3 CBOR(stringref) +51 258 DB40747E page.3 CBOR(stringref) +52 224 7629AF30 page.4 ZSTD(258) +53 264 D450FC21 page.4 ZSTD(343) +54 284 43F91F18 page.4 ZSTD(332) +55 224 C61DB7BA page.4 ZSTD(258) +56 264 F9547DBC page.4 ZSTD(343) +57 281 3DBB71E5 page.4 ZSTD(332) +58 225 8ACDB484 page.4 ZSTD(258) +59 264 8256E2D2 page.4 ZSTD(343) +60 281 D76156A2 page.4 ZSTD(332) +61 225 EDC6147B page.4 ZSTD(258) +62 258 D3AB1EF4 page.4 CBOR(stringref) +63 220 4851D677 page.4 ZSTD(258) +64 225 C8DCE54A page.4 ZSTD(258) +65 251 3D1E0F5F page.4 ZSTD(258) +66 258 1C5637CB page.4 CBOR(stringref) +67 343 09AE6714 page.5 CBOR(stringref) +68 332 4A97AC77 page.5 CBOR(stringref) +69 254 D1E43C69 page.5 ZSTD(258) +70 317 B6A2361D page.5 ZSTD(343) +71 325 A44CE35F page.5 ZSTD(332) +72 225 B69C7923 page.5 ZSTD(258) +73 265 FEBC2D45 page.5 ZSTD(343) +74 286 5FA5C389 page.5 ZSTD(332) +75 221 C36048C0 page.5 ZSTD(258) +76 262 E988C90B page.5 ZSTD(343) +77 280 6C98308C page.5 ZSTD(332) +~~~ + +# CHECKPOINTS + +~~~ +# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.0 +VERSION [ 0001]: 1 +PAGENUM [ 00000000]: 0 +1UNAKPG [ 00000000]: 0 +1UNAKSQ [0000000000000005]: 5 +MINSEQN [0000000000000001]: 1 +ELEMNTS [ 0000000C]: 12 +CHECKSM [ 4AFA3119] +# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.1 +VERSION [ 0001]: 1 +PAGENUM [ 00000001]: 1 +1UNAKPG [ 00000000]: 0 +1UNAKSQ [000000000000000D]: 13 +MINSEQN [000000000000000D]: 13 +ELEMNTS [ 0000000C]: 12 +CHECKSM [ 70829F7B] +# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.2 +VERSION [ 0001]: 1 +PAGENUM [ 00000002]: 2 +1UNAKPG [ 00000000]: 0 +1UNAKSQ [0000000000000019]: 25 +MINSEQN [0000000000000019]: 25 +ELEMNTS [ 0000000E]: 14 +CHECKSM [ 4ABFB50A] +# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.3 +VERSION [ 0001]: 1 +PAGENUM [ 00000003]: 3 +1UNAKPG [ 00000000]: 0 +1UNAKSQ [0000000000000027]: 39 +MINSEQN [0000000000000027]: 39 +ELEMNTS [ 0000000D]: 13 +CHECKSM [ 95B393C6] +# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.4 +VERSION [ 0001]: 1 +PAGENUM [ 00000004]: 4 +1UNAKPG [ 00000000]: 0 +1UNAKSQ [0000000000000034]: 52 +MINSEQN [0000000000000034]: 52 +ELEMNTS [ 0000000F]: 15 +CHECKSM [ 9B602904] +# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.head +VERSION [ 0001]: 1 +PAGENUM [ 00000005]: 5 +1UNAKPG [ 00000000]: 0 +1UNAKSQ [0000000000000043]: 67 +MINSEQN [0000000000000043]: 67 +ELEMNTS [ 0000000B]: 11 +CHECKSM [ B5F33B10] +~~~ \ No newline at end of file diff --git a/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.tar.gz b/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..53a0c4f985159033c7832a54e54890343d887189 GIT binary patch literal 9467 zcmZ9xWmwhU^94$m;2{MOIDn*-lr)D%KtZ}oknZk7NP{5KQqt00lG5GX-QAq{Tz}vD ze{=8K*?Z5-o;}Z+XRig0#zDCQwOFB`pBm!QYS_M_sbOOvV=PoNPaDfvl%=z!Gmlim z`r&WQ&ZuCmd3T!pB3Hvoy@}(fA*ZfMjXpnNnW-gdh2=VX^Pw^-o0kvaoaL+1&z>PP z?l*qlz77jmhiz(kHJ6(TmI^(3J@UK^2yhTzvnfTRK*i^`Y=Ib#F2+W_KpiGRN^ z#@l?1AJ|jjIzNuVo`n%VAag*As)M3JM=Xi|f&yLulMl>>Wa9dhS`cnS+d)3dLrK8) zZH0ap@*Z9q2o0tUBM;&JC1Og>(Z|{K*8~zy6!x4@SW6Nc=KR!&%x@?tz=Xes33y0` z9|$;ZKHfiGewyx{^!FEgDQH}%LfifG?6jxz;^OmA(2jEoJ0&7)gGu!UllO^I8f!JJ zD+96p6clh9huuC0iRf2H!S@)QShZNE*nDC}(cyv$aD_2tN#&x@qN*@#vB(5Q6sf*9 zCnCnQwad@5=U$%}&;vDwot=;O+%jo@w0C`NZ*uC_LKE7M`q_$JD&Le`y2IQK))DYK zyiYdZ9q2U~;&;Fup+O!5QG0t8i(p2HM=D4=od$6Kyy7~!WCACRF( z13(xNGQ>xKGiEU$kULq41l$RQ&aj_m0Z$|tM;B!V*v9g?1#oCFK~q*5Gr+66gLNcm z?&-b~IEOHlLR#nuV*zLE|9KAT1h$*N z2igG2?kfbzAOp|8km$7KCLz!fB?|VVG0G_5^8Zb%Z3l{Lg-l^PFhhbP@e*)pj{pMSn^Hhh zA3)uK{vz+F2E2IEfnB@{W!l&=on|!xkl^NQ1cQ7DB|(nrtb}MjG~sn5np)FVSl)vW z#l{1u@o)6m-smPjVGop*6lFXMQ2D#9ig5i`XGku$1ma$Y>1!)kf)EJ3RYAceodNc- z+Ao1fFG^H-Cc+3l9)kQ|NT8}8<#t?4tY6Xu<8s{rrb4SW@xUbzxC7gFVaq9odLX2>2AuTpWpzk=|sC*Uv@9iXx8ZV8BpVMwD@KGKiVlilI04h$*BLO5# z6WKA13^{oM6(fL4i9JB5nI}Wk&wr&A;f&Kxg@H7_f5`r81Vjb^1}H~Btd-LqKzICK zV_ZUrab*yYik$lgaFz`H#DEnddlSU)v_J9$VgNt^w!p8u@Fi$P;Qr5kxL6jrEr)=s zdv`n}xb$Bb*pwl~K;1Z2!UGqk%2^DM5Lt|DMw0?k3^ zdw`lGRj9=41c^6fw?H@~{vez8cs(0=Y1|7n{a25iYiKi?(F6WVbDFg`G#`fEc>2&% zlW&~wMe^pun$suzxrB~w^lggDzF(TQY>4_B&@#adoUi>`(O-UmhVjCv&-6~jF?_m) z1sE?%8+AP(9Tq$|feieCg;<0flC8p#h0o)iFhq&+2~Tt?^d`V&0ubRf%qL7paY4aD zEf6ol+sTb%iHfLQ$UfXV-P#?xxVcg7t++Gw5mx!aP4UTcD_f1V+jnp_{BUn;@8kVf z8eGjGoLU`rOKmXZLH15%&5a8SCQELNHJx;J-2b_IQ3li^bmzMe_m}C5B`}O1H>zTS zaTFYBju9h4T~_)N{NgG5+wHL**73RGy(Oe82Zm{i@UL3E>XVZ81eayUk)uB)(VSmj z=lc5aq@MJzO+fJN1d=YmR4(cJqS+5&NW?nhOb>@lcs`Xz(Z3L-84TjU zh&^N38Qrs2$0iQ40qMnq5v@63_M^*VkY#5~#Pg`{Yn-dc1h)@wGpgTXIFrUpy_T@p z@0igXx`0Y+fP!HgF&%1hb>wnhfYry4moLSN9lrgEAuMJHyyJhpa)1yleAx|b4NTw{ zL%E&efv(qWSh;Y}_o+1CSI#k{8J!Q{&E<}Oj|aj~WgfGY|JR^y2xjdIdn7gniy&Sk zhQB&3jUnn=c#t3vvhehJ1aMn~q7h{RU1V`+Zb<*@C^nFF zrHMmtz^`BIL~v7SUB!qztdWQZF1|!YVhNs0a>}7X44%0zJ|(zRE)#LY4&bZ$b60r@ z(^ce7o0vc70|qpvxOWf40lUzkVA zXDS(N=(D6?+__l>@j|#3=Av$HEW2aKtqXcumBOr(Sb>oncFm$|4((c6XFbtr zq`hg`=)>26nPY}&3U#&^A(lY`9uePcx>L`vVEs2~!4dl6BMk2v>lsTwE1`s^Z}Y(G2z0su0R9IFjs={st&o;7W)sH%_5_Ib z|Fs$d-DEfINHBxMItI-u;QtWPeG5d?NFWY7=5K+2qwKlYTjO~Yis^09B>{HER2Uc+ z^S-khILGrwLvO`$0kYrI=fI%yB0+{+e-|@?jL?ICbweOi zLh|;1XbwbRg~K4UN7#f9KoxTEKQs(JzX!gY11M7id}wLGPe3E*4qrU(bO4kN>-MAMH`c^Cqyw2GIQ^e-CWGXfFj6e{B@g21%i@ z&_?`s>EjOi24uuhhsQG14S;a|^1TA>oLygZCzab^awkmY_?3W^klpfH!zW3X&39}^ z@3Qeqa!=CM26Vn}{$*RL==;o}_vQ6OHL}vb66c;`N60-+msK(DFvZPl4APRK`Q3$0 z+NZzF8!Yayd`jaPt=b50e0Ig}o0Xg?z!Ce5%#OcU9Kl^IW{8}QE&_YTad$PIC9WA~kXuIb4W#5i4hcFjAoP+%L1Gk-CG2|jR*&qch%HeS&dB+9pqTNPJSQ;>R^Q1}?0K6Tsm^GMQbR*E5I)pcRkspf*VCDWswMG8?l zGWE;GyX6p}N`3-L+c&}6?mTbAnDxCv8I>n~E7ird)vhWewOSPpN;~!+c>mraR)p_3 zn?{#=X7MZYR;S%NkJU0|H=d~}gi}?$)ITetnQZ)w)A96Lb3||4rgYJmj$hV0#epFy zAT61S2pJV`-U#mr+3M4SUmq~NI(LgSUpBi9o5Xq=w!~a(s1^I^M^0+xPdXAma<{DB zk$+PY*O!$mg4vN~+D~||l+0c%z}FpyA#QW8KsRF_v(%(x!q-RJTI@8-Bw`eEW+0h-n?1ZCqDiCS-$_?i9NJ_@Tl{=u_7a0C?E|_* zQ~dJ1@AVCP^=zKieB*3`pZ0k=8gDL7ac(x1Ru@ciRwz8d;*A9D6qdrZO*i`LB*m;G z8A3)6e2%TLPo?L;Y7VDu7W?62NVcbSI#13|mh-(!k|$wCToK0tBRs0bGyT$w$2y+R zb{8A7lW$y3cJ+ronswF3hxS-Mf)S3l_%gA~)k>G3sP9VGc+7ueHdIybC3!I+I;tw1 z=_R0P{M_8LL5?K3mkf z0Q&#oh6vD&)B|^O!wpblc2XB@xNBQ=u3uPO>7D*bYAZIZE4g&~TyJU?6N~B5ns8B2 zRAS06oIq}DPm<8M3%rf$CGNOe2x2snXDxRJ zk2sdA=nQbA6Og`R{Pp!D^$kkQoT|e*0|V>M2WA^(B&+sO7>~44PVJ?#gx79F3jDWk zV;Q+-5-@AjIQJ+y1nb?ffvKicTiJd_F-{TA`3L!J1M>_s`0tGdAsQv!vVo6|^KSW6 zPH(lzI8xSh6Xg;)d9;bwC>t>d(;Ve$KlruGIl4}VefxBx8`|5b2+Qc!XEV z{yyQP0-PH5eVgo=f z1_-%ooQBv`am?&1BG==6e}?OIevLa=>Df_@?NMp+Zu41u($lb% zpvt4?m)nkrT@i|0Vku$)S&;8bwljFGN#?0_>sI6sIqcD?jNljSVzyPc12XQJ2VY>X zDH%Wf<+~qig~O(gqA3a3#%qrProC@`*qs|w^l~vaRV-arY#IEvkZV#+yIheZdeVH3e%Vv$u6Bo1$^LP-SgP!(r3iefl7h?Yb;@ zD(`-Baky3=3x2a6nr=a|3=(5G7>u~;jy!pXKT0Uy- z46tE((9@vJPfH0JWq zSyO8)Sk5?EO`hswexQc(P4i5-VYwPtmrKJv9z`tcxt2#*J_uJCYM@aphg(Dy?qr9Y zr!0kA1on&CvMVfaQIb$y7syRKmSb_sGO{`NVJEZP1LpgWVg=X(St?2?RzKdRyFB7)-Gk<+PgxRSmFTIV{4h zp>_4&Hio}s&%aDc%@#`9YHXx6+dSX*;L<6`La5!GjYb!|V?OC$Fz|0rWb#YQVbmV` zqhMC}p73>tR>Qk+Z^rJ_n}d?Wv!?r3c*(F&g~07!!D~AAkt9|1xY#T4 zC|uc`>z#bpgFHKEBhLu7WU=Nx;N++25Ey(AkL6b>P-q;4eo4e`Q0M+`*W2p$6ZthM z9cH=A%(c>bElM9aya@WT6@kgCXcbG?AN+EfG4&9g@WZt}K^H`R@`eg1jKX7#OJ4Pp z)kkG3hW~Na%sDMZvxOqClAUTU=`M2J)`KTSJ4#TQ6sd)?I)hmQG0b@M?a@c0QLMH! z)Vi5N<+aMPU)=BFtBV_OaNd`lH{?LCq_s%gT$R~+s}lC{1S6$54obehGB-ESd~6sa zF6+qPI579={ia9c;r;i4R&jqb4>~w$QvRN3b^yb%){Xot? zTcwFQtl@c2N@9Vy;T(y^Vvb^Udt{ar&duAo*DeDrds!6lB$$|}-frxG;wStFDj=}UgN^|b|+_R^E=6iO- zm^VCoD?XC7e?s@_OJL$oh4i4lz~YdPmE7AQ4bK;gx{iJxQ8i||lK#mL&yt;joyJ-O zzSNeW>!#pS8WObDdHB9!kUf+rN=`|-3f#ON71e&1U3c%N%@hn0AoWa%(X6@oh4D^X zk-4qNyB=my>h)U`WVY69+$L0eQoMP2KQl=*P1Oj)OmC~_5$=8KH#BTHI3NCbja0$i zk@v0VC%QEn$!Q?W4x4!M6hgIpj}?WMak7O|f}SIs6`Z$$ibhihbAZ%du{VR>zP`@M z!*W22L`$v1(R@rLoNHxP{VK}_EZv16X(oim!ks&C2DG0_)DdL_q&j62@d{DY?v5O@ zQYD8XklH1Z?KUk61}Y=*yiW$ex5UR)>-0!%j=VKn`GXB|QS{uxFf>eSeh&4S6UTtL zg6MEj-?}A$*Y;_;WTe9^PPCHeN}yeL(&ZbSmi8vVgLLXn5dES*$-%V0+JLDdYE1pU zUbuc&`lHg5UWWP{##@FS3yaVp*X}7e*6h-(3QE)|!!IVHn8}AZ4&Q`_v=hUnvZH=H z6z;v}zKN&A^>yC(qikB;_a(RPuKU#2~89DaS8sk;LEHce4U3!s@VI9 zCv$N#vFMAX#qlB5F(}kv#wHEUU=Ox_r$LNrQCjwuLHVPTrO!WqLu-2C+yw5GQsmn{ ze9El}r)oXhw_$shks+K}czXJ4GU-Uk%+O;*R!3#)|2o#%toa7mN94aBR2-B3f z{8nV~3Uxg{)uIa(r3Ihk^Wrm(LTr$9JPzdWa?tHa^Us^ zX%OZR3+B~zbZyNyay)fdqJR6_`s4GvHz&fh@3J~h(!ySigo)2g@oi3A|HrLB5(((D zTPA5|i}+C12caHmQgZj>v{r7Sc017xNO$A2X>DC>Nrb&HV}(s#Rx3YH$x?rLWv zY7(C+((4D=u>Mr2;Ht~CD)W~{R_T1db(}w$=ZUnwhHBUXh;T0%O`ocj%G`ReVkGkFTbuk%kcLJO~ zB9e``Vjwfv$6XeT8_YfaQ}++c8>s zoCC3JPj)ox%DpJ1pODf1SISe>z~fVbqCiHaNCh$rT((sYyP;1!N^_|1o%KmMcO4y! zwhx%~UV% zDuui|Lh#P98W>rDEdIm`TC7OruB7o&fN;ZI1t^vo?!pvV!G1buB`@T@GznSIR2kR^ z(7C23sP$v=i>0Pw5^}p#wLZr+#ptsk_H+G+0jbr>u=DL2IyXTu{Sf&KuluMbolDc(_#gu|pJq?*z#x+c^P{Bl;jamf{v#bp z9dMnkNDeR!Mu_M9pKooqJ2CO?Ly094%SY~Sz|-vF3RtT?+Embz+pfeN%DfwU@SG~i z$KE*Nt1XSd#9gwnzM*Eap}pK)Y-U2wKfH6j=>awdiERRRyVgj%59t%n+1Fh$f2ZE? zyU>5YHLWg+(;l=ZrsfJJ3Qpa2T@~z)*5%jqypm3F(5*Ey`KgLa$&lOfixI`PLQ~~W(Z*%=D-uZ^ zg$zj*Y+76hF{%z`8s*oYAG&m*R|F{i?>FV<(Q3+ zUK!T{IC`IcNpnFQ%-oI!RsS@!mXcUt?sZ&_x{>;O-_=VF*U;PY?>97T_btBj{Cp2v zsS}&`3q8w}PAuLe>OJ^MZa+i=i<|n8yijsTUO__%N);-jscgK6x$1knR{A^)+9_a* z&rg7=!%GmHTJ+h!Ldm$Ey*pGpPx_aw;nPzc0Tbq#@z`EBpO(+?r} zjTy+Va_wRrdG+CjO&&90;96w4FcIWM=zP3|++ z$}>pq5H7O#aYUDAJ2(XK&26?ZJ;1w5HsJ+$Wt+w-f0Lo&GDbb&6X%H5`lsf_RsH^t zO40I4M(#r??|w=|zYfQ|pdrR4_j3}K?gXPNkEuG;iZfK+xWHo1-lpUBMDekwqR@TF zO7n?ha&j{)2rJokEQGSm1O#LRm}QLqQ3c0dP>5o6nolQ6Qxufd^G!&=r>=Gz=pQER ze+ls5Lv~3y^_$jtc&gHG+BIjah%^C%Snqa}O1aO)r>EVWElHlyA}bX$*6<(QHtnF~ z_R42|>sw!Z;mrp&x|#OS^VS60=`-Xf%h`~|t*9Do@p^Ip-5V44?j^SRN7yu&ic6%+ z1;-D%wTPEpPu+V59@3>Y(o*=AJ~QywD|9LcF}`iElJTKH3*zUbXC`Z+q|8>Hpy#7$ z2D9ccGpf(*#p}JSY24O`rgX$G$3&8Rpj=&aJNw;Ez;l>-mH6PNd`a}QS*aJd&q@2) z3QL~l6Ta&|!2|WX5xN2rn|4Fw8p6N_bW`U%W)sCq?m)3tK(vE^+hO*5lnFRU`@#?* zK#Un)M6?in8EO7c6*>7o!J`bA`vTx%{5$$T!NWX^-C)GD0B0%8& zo%G+tW*{Cg?3xp2>huS5TAV$K?x+9%6oCzwhcn;Q5e4ZNoN{`gEo|*zi4l&Z_gI9^ z!{KQ?dkqcd_14R%hFPZ~;=7&s%6<5nK9)GoBx#wyO8ugvHzzIpDrVV$N>VUjoN!k_ zVnJ61L8dm0V)yE2E;6oqrhytSp)l|=2o)MfE!X}8zqR9 zm#F!9auvh!*K1#CvIvmyP7Di^>%EG;(@L!lfw8w=Sk!vS#*{2HT+WC$S+M*cOTjQx zNu(n1ZKFU|5K(QsdvTqVejRg_crSysCZ_6I#G9#=ZROD6`~f z?J9WSmPRDjWo>6>$ugMu{)zQfRCnZDSEI0dPOtm?BvXg$(VO@u^yp({FNdsmX4@*P z6@OaUHJ&kjSrQP6fxx&bep0FKfh}h@YOSO9?eHU1dq2d_9I{u~KBO5McypQ<9JmdW*1|`_YEq~p~F`<_I2((hwrhfEPA@A`N_FbgpoZ4Y-0p`~@ zH65$%-e>e#QsB_DZpPpa=`;TAptn9IkEwwD>e1W&I;ye(tfbFMu}X*BxFsjHU2X-b z;0vQ^+!ZxuYU;ZW+F%{GuKncN`THhLV}605y`J~hL&@?%;ZU!~jyFXasxI_>W=fAk zLTl$?%B;9o6 zgt2d|sg{LASxmV5XRfb{l+x-*Ip%7+Oi?2t72tqOG_ESY>z#V1I$ddnvP2zTIO^Ls zsVvU@VL00sdSjVlaa;XCG0z(PSNo|&IVKBzY{NSOMqSb@_Wyh<9iDHv{%SXo{^dB= ztPpUT{}>vM1P>i&1^)V3_F|iH$bD!5v-mI*v)J=wfJ*FE3wCbg%SjeRCTa}1lM6Zv zUxod6(Sf8TSxCGwxNXn=IWd-xT^CaF&~OM95#zMIJUfa&{5&dK zUsW5gpIdwnpVl*e+T}pJT}z>n9A%!9Eho_+XZcPu9WB%2_W8^~l#I*tWqdpB?Y8}e zD(Z)+4`ZFS!yy TXuVZEP!!LG1Pc^VRFwY*{GJvU literal 0 HcmV?d00001 diff --git a/qa/integration/fixtures/pq_drain_spec.yml b/qa/integration/fixtures/pq_drain_spec.yml new file mode 100644 index 0000000000..9b135a29e7 --- /dev/null +++ b/qa/integration/fixtures/pq_drain_spec.yml @@ -0,0 +1,3 @@ +--- +services: + - logstash \ No newline at end of file diff --git a/qa/integration/specs/pq_drain_spec.rb b/qa/integration/specs/pq_drain_spec.rb new file mode 100644 index 0000000000..b524d7547b --- /dev/null +++ b/qa/integration/specs/pq_drain_spec.rb @@ -0,0 +1,142 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require_relative '../framework/fixture' +require_relative '../framework/settings' +require_relative '../services/logstash_service' +require_relative '../framework/helpers' +require "logstash/devutils/rspec/spec_helper" + +require 'stud/temporary' + +if ENV['FEATURE_FLAG'] == 'persistent_queues' + describe "Test logstash queue draining" do + before(:all) { @fixture = Fixture.new(__FILE__) } + after(:all) { @fixture&.teardown } + + let(:logstash_service) { @fixture.get_service("logstash") } + + shared_examples 'pq drain' do |queue_compression_setting| + let(:settings_flags) { super().merge('queue.drain' => true) } + + around(:each) do |example| + Stud::Temporary.directory('data') do |tempdir| + # expand the fixture tarball into our temp data dir + data_dir_tarball = File.join(__dir__, '../fixtures/data_dirs/mixed-compression-queue-data-dir.tar.gz') + `tar --directory #{Shellwords.escape(tempdir)} --strip-components 1 -xzf "#{Shellwords.escape(data_dir_tarball)}"` + + @data_dir = tempdir + example.call + end + end + + around(:each) do |example| + Stud::Temporary.file('output') do |tempfile| + @output_file = tempfile.path + example.call + end + end + + let(:pipeline) do + <<~PIPELINE + input { generator { count => 1 type => seed } } + output { file { path => "#{@output_file}" codec => json_lines } } + PIPELINE + end + + it "reads the contents of the PQ and drains" do + + unacked_queued_elements = Pathname.new(@data_dir).glob('queue/main/checkpoint*').map { |cpf| decode_checkpoint(cpf) } + .map { |cp| (cp.elements - (cp.first_unacked_seq - cp.min_sequence)) }.reduce(&:+) + + invoke_args = %W( + --log.level=debug + --path.settings=#{File.dirname(logstash_service.application_settings_file)} + --path.data=#{@data_dir} + --pipeline.workers=2 + --pipeline.batch.size=8 + --config.string=#{pipeline} + ) + invoke_args << "-Squeue.compression=#{queue_compression_setting}" unless queue_compression_setting.nil? + + status = logstash_service.run(*invoke_args) + + aggregate_failures('process output') do + expect(status.exit_code).to be_zero + expect(status.stderr_and_stdout).to include("queue.type: persisted") + expect(status.stderr_and_stdout).to include("queue.drain: true") + expect(status.stderr_and_stdout).to include("queue.compression: #{queue_compression_setting}") unless queue_compression_setting.nil? + end + + aggregate_failures('processing result') do + # count the events, make sure they're all the right shape. + expect(::File::size(@output_file)).to_not be_zero + + written_events = ::File::read(@output_file).lines.map { |line| JSON.load(line) } + expect(written_events.size).to eq(unacked_queued_elements + 1) + timestamps = written_events.map {|event| event['@timestamp'] } + expect(timestamps.uniq.size).to eq(written_events.size) + end + + aggregate_failures('resulting queue state') do + # glob the data dir and make sure things have been cleaned up. + # we should only have a head checkpoint and a single fully-acked page. + checkpoints = Pathname.new(@data_dir).glob('queue/main/checkpoint*') + expect(checkpoints.size).to eq(1) + expect(checkpoints.first.basename.to_s).to eq('checkpoint.head') + checkpoint = decode_checkpoint(checkpoints.first) + expect(checkpoint.first_unacked_page).to eq(checkpoint.page_number) + expect(checkpoint.first_unacked_seq).to eq(checkpoint.min_sequence + checkpoint.elements) + + pages = Pathname.new(@data_dir).glob('queue/main/page*') + expect(pages.size).to eq(1) + end + end + end + + context "`queue.compression` setting" do + %w(none speed balanced size).each do |explicit_compression_setting| + context "explicit `#{explicit_compression_setting}`" do + include_examples 'pq drain', explicit_compression_setting + end + end + context "default setting" do + include_examples 'pq drain', nil + end + end + end + + def decode_checkpoint(path) + bytes = path.read(encoding: 'BINARY').bytes + + bstoi = -> (bs) { bs.reduce(0) {|m,b| (m<<8)+b } } + + version = bstoi[bytes.slice(0,2)] + pagenum = bstoi[bytes.slice(2,4)] + first_unacked_page = bstoi[bytes.slice(6,4)] + first_unacked_seq = bstoi[bytes.slice(10,8)] + min_sequence = bstoi[bytes.slice(18,8)] + elements = bstoi[bytes.slice(26,4)] + + OpenStruct.new(version: version, + page_number: pagenum, + first_unacked_page: first_unacked_page, + first_unacked_seq: first_unacked_seq, + min_sequence: min_sequence, + elements: elements) + end +end \ No newline at end of file From 1ed553002bbd3f1b2fb65981c238fc59096dec94 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Wed, 17 Sep 2025 17:16:22 +0000 Subject: [PATCH 3/8] license: add notice for `com.github.luben:zstd-jni` --- .../com.github.luben!zstd-jni-NOTICE.txt | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 tools/dependencies-report/src/main/resources/notices/com.github.luben!zstd-jni-NOTICE.txt diff --git a/tools/dependencies-report/src/main/resources/notices/com.github.luben!zstd-jni-NOTICE.txt b/tools/dependencies-report/src/main/resources/notices/com.github.luben!zstd-jni-NOTICE.txt new file mode 100644 index 0000000000..4accd5fd41 --- /dev/null +++ b/tools/dependencies-report/src/main/resources/notices/com.github.luben!zstd-jni-NOTICE.txt @@ -0,0 +1,26 @@ +source: https://github.com/luben/zstd-jni/blob/v1.5.7-4/LICENSE + +Copyright (c) 2015-present, Luben Karavelov/ All rights reserved. + +BSD-2-Clause License https://opensource.org/license/bsd-2-clause + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, this + list of conditions and the following disclaimer in the documentation and/or + other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file From a480ebfdf9aa9c6058b809ecf6993e03f88adf25 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Wed, 17 Sep 2025 17:29:43 +0000 Subject: [PATCH 4/8] pq: log compression encode/decode from the codec --- .../ackedqueue/AbstractZstdAwareCompressionCodec.java | 9 ++++++++- .../src/main/java/org/logstash/ackedqueue/Queue.java | 3 --- .../logstash/ackedqueue/ZstdEnabledCompressionCodec.java | 4 +++- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java index 378f0fd06f..e3376a3f93 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java @@ -1,6 +1,8 @@ package org.logstash.ackedqueue; import com.github.luben.zstd.Zstd; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.logstash.util.CleanerThreadLocal; import org.logstash.util.SetOnceReference; @@ -16,13 +18,18 @@ * deflate-compressed, the given bytes are emitted verbatim. */ abstract class AbstractZstdAwareCompressionCodec implements CompressionCodec { + // log from the concrete class + protected final Logger logger = LogManager.getLogger(this.getClass()); + @Override public byte[] decode(byte[] data) { if (!isZstd(data)) { return data; } try { - return Zstd.decompress(data); + final byte[] decoded = Zstd.decompress(data); + logger.trace("decoded {} -> {}", data.length, decoded.length); + return decoded; } catch (Exception e) { throw new RuntimeException("Exception while decoding", e); } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index de76e412b4..ceace48588 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -419,8 +419,6 @@ public long write(Queueable element) throws IOException { byte[] serializedBytes = element.serialize(); byte[] data = compressionCodec.encode(serializedBytes); - logger.trace("serialized: {}->{}", serializedBytes.length, data.length); - // the write strategy with regard to the isFull() state is to assume there is space for this element // and write it, then after write verify if we just filled the queue and wait on the notFull condition // *after* the write which is both safer for a crash condition, and the queue closing sequence. In the former case @@ -773,7 +771,6 @@ public CheckpointIO getCheckpointIO() { public Queueable deserialize(byte[] bytes) { try { byte[] decodedBytes = compressionCodec.decode(bytes); - logger.trace("deserialized: {}->{}", bytes.length, decodedBytes.length); return (Queueable)this.deserializeMethod.invoke(this.elementClass, decodedBytes); } catch (IllegalAccessException|InvocationTargetException e) { throw new QueueRuntimeException("deserialize invocation error", e); diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java index 01fb55673a..fa5a22b3ee 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java @@ -31,7 +31,9 @@ public enum Goal { @Override public byte[] encode(byte[] data) { try { - return Zstd.compress(data, internalLevel); + final byte[] encoded = Zstd.compress(data, internalLevel); + logger.trace("encoded {} -> {}", data.length, encoded.length); + return encoded; } catch (Exception e) { throw new RuntimeException("Exception while encoding", e); } From 772da724e6c82297ba2f297d2a08f6e8baaef7f0 Mon Sep 17 00:00:00 2001 From: Rye Biesemeyer Date: Wed, 17 Sep 2025 10:48:36 -0700 Subject: [PATCH 5/8] Apply docs suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- docs/reference/logstash-settings-file.md | 2 +- docs/reference/persistent-queues.md | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/reference/logstash-settings-file.md b/docs/reference/logstash-settings-file.md index b14ac26f13..aa0a27e722 100644 --- a/docs/reference/logstash-settings-file.md +++ b/docs/reference/logstash-settings-file.md @@ -68,7 +68,7 @@ The `logstash.yml` file includes these settings. | `queue.checkpoint.acks` | The maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.acks: 0` to set this value to unlimited. | 1024 | | `queue.checkpoint.writes` | The maximum number of written events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.writes: 0` to set this value to unlimited. | 1024 | | `queue.checkpoint.retry` | When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on Windows platform, filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances. (`queue.type: persisted`) | `true` | -| `queue.compression` | Set a persisted queue compression goal, which allows the pipeline to spend CPU to reduce the serialized size on disk. Acceptable values are `speed`, `balanced`, and `size`. | `none` | +| `queue.compression` | Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `speed`, `balanced`, and `size`. | `none` | | `queue.drain` | When enabled, Logstash waits until the persistent queue (`queue.type: persisted`) is drained before shutting down. | `false` | | `dead_letter_queue.enable` | Flag to instruct Logstash to enable the DLQ feature supported by plugins. | `false` | | `dead_letter_queue.max_bytes` | The maximum size of each dead letter queue. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting. | `1024mb` | diff --git a/docs/reference/persistent-queues.md b/docs/reference/persistent-queues.md index 7694b471fd..ae289f5bf7 100644 --- a/docs/reference/persistent-queues.md +++ b/docs/reference/persistent-queues.md @@ -85,10 +85,13 @@ If you want to define values for a specific pipeline, use [`pipelines.yml`](/ref : Sets the interval in milliseconds when a checkpoint is forced on the head page. Default is `1000`. Set to `0` to eliminate periodic checkpoints. `queue.compression` {applies_to}`stack: ga 9.2` -: Sets the event compression goal for use with the persisted queue. Default is `none`. Acceptable values include: +: Sets the event compression level for use with the Persisted Queue. Default is `none`. Possible values are: * `speed`: optimize for fastest compression operation * `size`: optimize for smallest possible size on disk, spending more CPU * `balanced`: a balance between the `speed` and `size` settings +:::{important} +Enabling compression will make the PQ incompatible with previous Logstash releases that did not support compression. +::: ## Configuration notes [pq-config-notes] From b368e4960cef7186d7b1ce5eb89a949c5fce88ca Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Wed, 17 Sep 2025 17:51:57 +0000 Subject: [PATCH 6/8] remove CleanerThreadLocal utility --- .../AbstractZstdAwareCompressionCodec.java | 8 - .../org/logstash/util/CleanerThreadLocal.java | 115 ------------- .../logstash/util/CleanerThreadLocalTest.java | 159 ------------------ 3 files changed, 282 deletions(-) delete mode 100644 logstash-core/src/main/java/org/logstash/util/CleanerThreadLocal.java delete mode 100644 logstash-core/src/test/java/org/logstash/util/CleanerThreadLocalTest.java diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java index e3376a3f93..1cb54c98a7 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java @@ -3,14 +3,6 @@ import com.github.luben.zstd.Zstd; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.logstash.util.CleanerThreadLocal; -import org.logstash.util.SetOnceReference; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.lang.ref.Cleaner; -import java.util.zip.DataFormatException; -import java.util.zip.Inflater; /** * Subclasses of {@link AbstractZstdAwareCompressionCodec} are {@link CompressionCodec}s that are capable diff --git a/logstash-core/src/main/java/org/logstash/util/CleanerThreadLocal.java b/logstash-core/src/main/java/org/logstash/util/CleanerThreadLocal.java deleted file mode 100644 index 83f6d1289e..0000000000 --- a/logstash-core/src/main/java/org/logstash/util/CleanerThreadLocal.java +++ /dev/null @@ -1,115 +0,0 @@ -package org.logstash.util; - -import java.lang.ref.Cleaner; -import java.util.Objects; -import java.util.function.Consumer; -import java.util.function.Supplier; - -/** - * A {@link CleanerThreadLocal} is semantically the same as a {@link ThreadLocal}, except that a clean action - * is called against each object when the {@link CleanerThreadLocal} no longer holds a reference to it. - * @param the external value type - */ -public final class CleanerThreadLocal { - private final ThreadLocal threadLocal; - private final Consumer cleanAction; - private final Cleaner cleaner; - - private CleanerThreadLocal(final Cleaner cleaner, - final Supplier factory, - final Consumer cleanAction) { - this.cleaner = cleaner; - this.cleanAction = cleanAction; - this.threadLocal = ThreadLocal.withInitial(() -> wrap(factory.get())); - } - - public T get() { - return unwrap(threadLocal.get()); - } - - public void set(final T value) { - threadLocal.set(wrap(value)); - } - - public void remove() { - threadLocal.remove(); - } - - private T unwrap(final CleanableHolder holder) { - if (holder == null) { - return null; - } - return holder.getDelegate(); - } - - private CleanableHolder wrap(final T value) { - if (value == null) { - return null; - } - return new CleanableHolder(value, this.cleanAction); - } - - public static Factory withInitial(final Supplier initialValueSupplier) { - return new Factory<>(initialValueSupplier); - } - - public static CleanerThreadLocal withCleanAction(final Consumer cleanAction) { - return new Factory().withCleanAction(cleanAction); - } - - public static CleanerThreadLocal withCleanAction(final Consumer cleanAction, final Cleaner cleaner) { - return new Factory().withCleanAction(cleanAction, cleaner); - } - - public static class Factory { - private final Supplier initialValueSupplier; - - private Factory() { - this.initialValueSupplier = () -> null; - } - - private Factory(final Supplier initialValueSupplier) { - this.initialValueSupplier = Objects.requireNonNull(initialValueSupplier); - } - - public CleanerThreadLocal withCleanAction(final Consumer cleanAction) { - return withCleanAction(cleanAction, Cleaner.create()); - } - - public CleanerThreadLocal withCleanAction(final Consumer cleanAction, final Cleaner cleaner) { - return new CleanerThreadLocal<>(Objects.requireNonNullElseGet(cleaner, Cleaner::create), initialValueSupplier, cleanAction); - } - } - - private class CleanableHolder { - private final T delegate; - - static class CleaningAction implements Runnable { - private final T delegate; - private final Consumer cleanAction; - - CleaningAction(final T delegate, - final Consumer cleanAction) { - this.delegate = delegate; - this.cleanAction = cleanAction; - } - - @Override - public void run() { - if (delegate != null) { - cleanAction.accept(delegate); - } - } - } - - private CleanableHolder(final T delegate, - final Consumer finalizer) { - this.delegate = delegate; - cleaner.register(this, new CleaningAction(delegate, finalizer)); - } - - private T getDelegate() { - return delegate; - } - } -} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/util/CleanerThreadLocalTest.java b/logstash-core/src/test/java/org/logstash/util/CleanerThreadLocalTest.java deleted file mode 100644 index d90937d379..0000000000 --- a/logstash-core/src/test/java/org/logstash/util/CleanerThreadLocalTest.java +++ /dev/null @@ -1,159 +0,0 @@ -package org.logstash.util; - -import org.junit.Test; -import org.hamcrest.FeatureMatcher; -import org.hamcrest.Matcher; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; - -import static org.junit.Assert.assertTrue; - -public class CleanerThreadLocalTest { - @Test - public void testWithExecutorService() throws Exception { - final int threads = 100; - final int iterations = 10_000; - final ResourceFactory resourceFactory = new ResourceFactory(); - final CleanerThreadLocal threadLocal = CleanerThreadLocal - .withInitial(resourceFactory::create) - .withCleanAction(ResourceFactory.Resource::closeAction); - - assertThat(resourceFactory.getAll(), hasSize(0)); - - final ExecutorService executorService = Executors.newFixedThreadPool(threads); - for (int i = 0; i < iterations; i++) { - executorService.submit(() -> threadLocal.get().incrementAccessCounter()); - } - - // while the threads are alive, we should not have cleaned up - assertThat(resourceFactory.getAll(), allOf( - everyItem(isClosed(is(equalTo(false)))), - everyItem(closeCount(is(equalTo(0)))) - )); - - // shutdown and wait; this should kill the threads and empty the threadlocal - executorService.shutdown(); - assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS)); - - // make sure each of the threads got a resource, and that no resource was accessed cross-threads - assertThat(resourceFactory.getAll(), hasSize(threads)); - assertThat(resourceFactory.getAll(), everyItem(accessThreadOrigins(hasSize(1)))); - - // validate that all iterations completed - Integer totalAccesses = resourceFactory.getAll().stream().map((resource -> resource.accessCounter.get())).reduce(0, Integer::sum); - assertThat(totalAccesses, equalTo(iterations)); - - // ensure that the cleanup actions were executed - bruteForceGC(); - assertThat(resourceFactory.getAll(), allOf( - everyItem(isClosed(is(equalTo(true)))), - everyItem(closeCount(is(equalTo(1)))) - )); - } - - static Matcher isClosed(Matcher closedMatcher) { - return new FeatureMatcher(closedMatcher, "a resource", "closed") { - @Override - protected Boolean featureValueOf(ResourceFactory.Resource resource) { - return resource.isClosed(); - } - }; - } - - - static Matcher closeCount(Matcher closeCountMatcher) { - return new FeatureMatcher(closeCountMatcher, "a resource", "close count") { - @Override - protected Integer featureValueOf(ResourceFactory.Resource resource) { - return resource.closeCounter.get(); - } - }; - } - - static Matcher accessThreadOrigins(Matcher> threadOriginsMatcher) { - return new FeatureMatcher>(threadOriginsMatcher, "a resource", "access thread origins") { - @Override - protected Collection featureValueOf(ResourceFactory.Resource resource) { - return resource.threadIds.get(); - } - }; - } - - private void bruteForceGC() throws Exception { - // technically just a hint to the JVM, - // so we hint many times. - for (int i = 0; i < 100; i++) { - System.gc(); - } - } - - static class ResourceFactory { - private AtomicReference> resources = new AtomicReference<>(Set.of()); - protected AtomicInteger counter = new AtomicInteger(); - - public Resource create() { - final Resource resource = new Resource(); - resources.updateAndGet((existing) -> immutableSetAdd(existing, resource)); - return resource; - } - - public Set getAll() { - return resources.get(); - } - - class Resource { - private final int id = counter.incrementAndGet(); - - private final AtomicInteger accessCounter = new AtomicInteger(); - private final AtomicInteger closeCounter = new AtomicInteger(); - private final AtomicBoolean closed = new AtomicBoolean(); - - private final AtomicReference> threadIds = new AtomicReference<>(Set.of()); - - public int getId() { - return id; - } - - public void incrementAccessCounter() { - if (closed.get()) { - throw new IllegalStateException("closed"); - } - threadIds.updateAndGet((existing) -> immutableSetAdd(existing, getCurrentThreadId())); - accessCounter.incrementAndGet(); - } - - @SuppressWarnings("deprecation") - private static long getCurrentThreadId() { - // [JEP-425](https://openjdk.org/jeps/425) introduced `Thread#threadId()` to replace `Thread#getId()` - // in Java 19 - return Thread.currentThread().getId(); - } - - public void closeAction() { - closeCounter.incrementAndGet(); - closed.set(true); - } - - public boolean isClosed() { - return closed.get(); - } - } - - Set immutableSetAdd(Set existing, T item) { - final Set mutable = new HashSet<>(existing); - mutable.add(item); - return Set.copyOf(mutable); - } - } -} \ No newline at end of file From e90a4252ea56f1233f2aea1ceab60d11ac86bdaf Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 18 Sep 2025 18:24:39 +0000 Subject: [PATCH 7/8] license: add mapping for com.github.luben:zstd-jni --- tools/dependencies-report/src/main/resources/licenseMapping.csv | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/dependencies-report/src/main/resources/licenseMapping.csv b/tools/dependencies-report/src/main/resources/licenseMapping.csv index 128eef5996..14fb7c8c28 100644 --- a/tools/dependencies-report/src/main/resources/licenseMapping.csv +++ b/tools/dependencies-report/src/main/resources/licenseMapping.csv @@ -33,6 +33,7 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "com.fasterxml.jackson.core:jackson-databind:",https://github.com/FasterXML/jackson-databind,Apache-2.0 "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:",https://github.com/FasterXML/jackson-dataformats-binary,Apache-2.0 "com.fasterxml.jackson.module:jackson-module-afterburner:",https://github.com/FasterXML/jackson-modules-base,Apache-2.0 +"com.github.luben:zstd-jni:1.5.7-4",https://github.com/luben/zstd-jni,BSD-2-Clause "com.google.googlejavaformat:google-java-format:",https://github.com/google/google-java-format,Apache-2.0 "com.google.guava:guava:",https://github.com/google/guava,Apache-2.0 "com.google.j2objc:j2objc-annotations:",https://github.com/google/j2objc/,Apache-2.0 From 31c69d6cda10a368e69b25f7b15d2c4fd0eae3a2 Mon Sep 17 00:00:00 2001 From: Rye Biesemeyer Date: Fri, 26 Sep 2025 14:06:58 -0700 Subject: [PATCH 8/8] Apply suggestions from code review Co-authored-by: Rob Bavey --- docs/reference/logstash-settings-file.md | 2 +- docs/reference/persistent-queues.md | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/reference/logstash-settings-file.md b/docs/reference/logstash-settings-file.md index aa0a27e722..419e1e3f5a 100644 --- a/docs/reference/logstash-settings-file.md +++ b/docs/reference/logstash-settings-file.md @@ -68,7 +68,7 @@ The `logstash.yml` file includes these settings. | `queue.checkpoint.acks` | The maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.acks: 0` to set this value to unlimited. | 1024 | | `queue.checkpoint.writes` | The maximum number of written events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.writes: 0` to set this value to unlimited. | 1024 | | `queue.checkpoint.retry` | When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on Windows platform, filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances. (`queue.type: persisted`) | `true` | -| `queue.compression` | Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `speed`, `balanced`, and `size`. | `none` | +| `queue.compression` {applies_to}`stack: ga 9.2` | Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `speed`, `balanced`, and `size`. | `none` | | `queue.drain` | When enabled, Logstash waits until the persistent queue (`queue.type: persisted`) is drained before shutting down. | `false` | | `dead_letter_queue.enable` | Flag to instruct Logstash to enable the DLQ feature supported by plugins. | `false` | | `dead_letter_queue.max_bytes` | The maximum size of each dead letter queue. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting. | `1024mb` | diff --git a/docs/reference/persistent-queues.md b/docs/reference/persistent-queues.md index ae289f5bf7..b10c117981 100644 --- a/docs/reference/persistent-queues.md +++ b/docs/reference/persistent-queues.md @@ -86,11 +86,13 @@ If you want to define values for a specific pipeline, use [`pipelines.yml`](/ref `queue.compression` {applies_to}`stack: ga 9.2` : Sets the event compression level for use with the Persisted Queue. Default is `none`. Possible values are: + * `none`: does not perform compression, but reads compressed events * `speed`: optimize for fastest compression operation * `size`: optimize for smallest possible size on disk, spending more CPU * `balanced`: a balance between the `speed` and `size` settings :::{important} -Enabling compression will make the PQ incompatible with previous Logstash releases that did not support compression. +Compression can be enabled for an existing PQ, but once compressed elements have been added to a PQ, that PQ cannot be read by previous Logstash releases that did not support compression. +If you need to downgrade Logstash after enabling the PQ, you will need to either delete the PQ or run the pipeline with `queue.drain: true` first to ensure that no compressed elements remain. ::: ## Configuration notes [pq-config-notes]