From 13f47b6c95c39b756c299539b512d5a325d004a6 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 11 Sep 2025 12:40:12 +0200 Subject: [PATCH 01/10] First rough idea to store byte_size and document count in a gauge to keep the values correlated --- .../lib/logstash/api/commands/stats.rb | 3 +++ .../execution/AbstractPipelineExt.java | 24 +++++++++++++++++++ .../QueueReadClientBatchMetrics.java | 4 ++++ .../instrument/metrics/MetricExt.java | 2 +- .../instrument/metrics/MetricKeys.java | 2 ++ .../metrics/NamespacedMetricExt.java | 5 +++- .../metrics/gauge/LazyDelegatingGauge.java | 22 +++++++++++++++++ 7 files changed, 60 insertions(+), 2 deletions(-) diff --git a/logstash-core/lib/logstash/api/commands/stats.rb b/logstash-core/lib/logstash/api/commands/stats.rb index 2ec44fe0af..6653634303 100644 --- a/logstash-core/lib/logstash/api/commands/stats.rb +++ b/logstash-core/lib/logstash/api/commands/stats.rb @@ -173,14 +173,17 @@ def plugin_stats(stats, plugin_type) end def refine_batch_metrics(stats) + current_data_point = stats[:batch][:current] { :event_count => { + :current => current_data_point, :average => { # average return a FlowMetric which and we need to invoke getValue to obtain the map with metric details. :lifetime => stats[:batch][:event_count][:average].value["lifetime"] ? stats[:batch][:event_count][:average].value["lifetime"].round : 0 } }, :byte_size => { + :current => current_data_point, :average => { :lifetime => stats[:batch][:byte_size][:average].value["lifetime"] ? stats[:batch][:byte_size][:average].value["lifetime"].round : 0 } diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index ef12af32cd..8454279995 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -92,6 +92,7 @@ import org.logstash.instrument.metrics.MetricType; import org.logstash.instrument.metrics.NullMetricExt; import org.logstash.instrument.metrics.UpScaledMetric; +import org.logstash.instrument.metrics.gauge.TextGauge; import org.logstash.instrument.metrics.timer.TimerMetric; import org.logstash.instrument.metrics.UptimeMetric; import org.logstash.instrument.metrics.counter.LongCounter; @@ -607,6 +608,14 @@ private void initializeBatchMetrics(ThreadContext context) { final FlowMetric byteSizePerBatch = createFlowMetric(BATCH_AVERAGE_KEY, totalBytes, batchCounter); this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, byteSizePerBatch); storeMetric(context, batchSizeNamespace, byteSizePerBatch); + +// initOrGetTextGaugeMetric(context, buildNamespace(BATCH_KEY), BATCH_CURRENT_KEY); +// Optional textGauge = initOrGetTextGaugeMetric(context, buildNamespace(BATCH_KEY), BATCH_CURRENT_KEY); +// if (textGauge.isPresent()) { +// storeMetric(context, batchSizeNamespace, textGauge.get()); +// } else { +// LOGGER.warn("Unable to initialize batch.current gauge, it will not be available"); +// } } private boolean isBatchMetricsEnabled(ThreadContext context) { @@ -682,6 +691,21 @@ private Optional initOrGetNumberGaugeMetric(final ThreadContext con return Optional.of((NumberGauge) delegatingGauge.getMetric().get()); } + private Optional initOrGetTextGaugeMetric(final ThreadContext context, + final RubySymbol[] subPipelineNamespacePath, + final RubySymbol metricName) { + final IRubyObject collector = this.metric.collector(context); + final IRubyObject fullNamespace = pipelineNamespacedPath(subPipelineNamespacePath); + final IRubyObject retrievedMetric = collector.callMethod(context, "get", new IRubyObject[]{fullNamespace, metricName, context.runtime.newSymbol("gauge")}); + + LazyDelegatingGauge delegatingGauge = retrievedMetric.toJava(LazyDelegatingGauge.class); + if (Objects.isNull(delegatingGauge.getType()) || delegatingGauge.getType() != MetricType.GAUGE_TEXT) { + return Optional.empty(); + } + + return Optional.of((TextGauge) delegatingGauge.getMetric().get()); + } + private UptimeMetric initOrGetUptimeMetric(final ThreadContext context, final RubySymbol[] subPipelineNamespacePath, final RubySymbol uptimeMetricName) { diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java index 3ed58daaef..688f82ca7a 100644 --- a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java +++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java @@ -7,6 +7,7 @@ import org.logstash.ext.JrubyEventExtLibrary; import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; import org.logstash.instrument.metrics.counter.LongCounter; +import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge; import java.security.SecureRandom; @@ -22,6 +23,7 @@ class QueueReadClientBatchMetrics { private LongCounter pipelineMetricBatchByteSize; private LongCounter pipelineMetricBatchTotalEvents; private final SecureRandom random = new SecureRandom(); + private LazyDelegatingGauge currentBatchDimensions; public QueueReadClientBatchMetrics(QueueFactoryExt.BatchMetricMode batchMetricMode) { this.batchMetricMode = batchMetricMode; @@ -35,6 +37,7 @@ public void setupMetrics(AbstractNamespacedMetricExt namespacedMetric) { pipelineMetricBatchCount = LongCounter.fromRubyBase(batchNamespace, BATCH_COUNT); pipelineMetricBatchTotalEvents = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_EVENTS); pipelineMetricBatchByteSize = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_BYTES); + currentBatchDimensions = LazyDelegatingGauge.fromRubyBase(batchNamespace, BATCH_CURRENT_KEY); } } @@ -69,6 +72,7 @@ private void updateBatchSizeMetric(QueueBatch batch) { pipelineMetricBatchCount.increment(); pipelineMetricBatchTotalEvents.increment(batch.filteredSize()); pipelineMetricBatchByteSize.increment(totalSize); + currentBatchDimensions.set(String.format("%d-%d", batch.filteredSize(), totalSize)); } catch (IllegalArgumentException e) { LOG.error("Failed to calculate batch byte size for metrics", e); } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricExt.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricExt.java index 1303e1a753..3b1122a563 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricExt.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricExt.java @@ -52,7 +52,7 @@ public final class MetricExt extends AbstractSimpleMetricExt { private static final RubySymbol DECREMENT = RubyUtil.RUBY.newSymbol("decrement"); - private static final RubySymbol GAUGE = RubyUtil.RUBY.newSymbol("gauge"); + public static final RubySymbol GAUGE = RubyUtil.RUBY.newSymbol("gauge"); private static final RubySymbol TIMER = RubyUtil.RUBY.newSymbol("timer"); private static final RubySymbol SET = RubyUtil.RUBY.newSymbol("set"); private static final RubySymbol GET = RubyUtil.RUBY.newSymbol("get"); diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java index 15f540b3dc..e304351728 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java @@ -131,4 +131,6 @@ private MetricKeys() { public static final RubySymbol BATCH_BYTE_SIZE_KEY = RubyUtil.RUBY.newSymbol("byte_size"); + public static final RubySymbol BATCH_CURRENT_KEY = RubyUtil.RUBY.newSymbol("current"); + } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/NamespacedMetricExt.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/NamespacedMetricExt.java index 8a77b4e2f0..36a8acb925 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/NamespacedMetricExt.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/NamespacedMetricExt.java @@ -76,7 +76,10 @@ protected IRubyObject getCounter(final ThreadContext context, final IRubyObject @Override protected IRubyObject getGauge(final ThreadContext context, final IRubyObject key, final IRubyObject value) { - return metric.gauge(context, namespaceName, key, value); +// return metric.gauge(context, namespaceName, key, value); + return collector(context).callMethod( + context, "get", new IRubyObject[]{namespaceName, key, MetricExt.GAUGE} + ); } @Override diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java index cf9f30ad8b..1eb7904567 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java @@ -23,8 +23,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jruby.RubyHash; +import org.jruby.RubySymbol; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; import org.logstash.ext.JrubyTimestampExtLibrary.RubyTimestamp; import org.logstash.instrument.metrics.AbstractMetric; +import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; import org.logstash.instrument.metrics.MetricType; import java.util.List; @@ -39,11 +44,28 @@ public class LazyDelegatingGauge extends AbstractMetric implements Gauge private static final Logger LOGGER = LogManager.getLogger(LazyDelegatingGauge.class); + public static final LazyDelegatingGauge DUMMY_GAUGE = new LazyDelegatingGauge("dummy"); + protected final String key; @SuppressWarnings("rawtypes") private GaugeMetric lazyMetric; + + public static LazyDelegatingGauge fromRubyBase(final AbstractNamespacedMetricExt metric, final RubySymbol key) { + final ThreadContext context = RubyUtil.RUBY.getCurrentContext(); + // just initialize an empty gauge + final IRubyObject gauge = metric.gauge(context, key, context.runtime.newString("undefined")); +// gauge.callMethod(context, "set", context.runtime.newString("zero")); + final LazyDelegatingGauge javaGauge; + if (LazyDelegatingGauge.class.isAssignableFrom(gauge.getJavaClass())) { + javaGauge = gauge.toJava(LazyDelegatingGauge.class); + } else { + javaGauge = DUMMY_GAUGE; + } + return javaGauge; + } + /** * Constructor - null initial value * From e300cc8ee9c7b22cf75d5e70f99904921266cc08 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 11 Sep 2025 15:08:55 +0200 Subject: [PATCH 02/10] Switched from Text gauge to a List gauge to contain the two values (document count and batch bytes size) --- .../lib/logstash/api/commands/stats.rb | 5 ++-- .../execution/AbstractPipelineExt.java | 28 +++++++++---------- .../QueueReadClientBatchMetrics.java | 4 ++- .../metrics/gauge/LazyDelegatingGauge.java | 3 +- 4 files changed, 22 insertions(+), 18 deletions(-) diff --git a/logstash-core/lib/logstash/api/commands/stats.rb b/logstash-core/lib/logstash/api/commands/stats.rb index 6653634303..aa10e5431a 100644 --- a/logstash-core/lib/logstash/api/commands/stats.rb +++ b/logstash-core/lib/logstash/api/commands/stats.rb @@ -176,14 +176,15 @@ def refine_batch_metrics(stats) current_data_point = stats[:batch][:current] { :event_count => { - :current => current_data_point, + # current_data_point is an instance of org.logstash.instrument.metrics.gauge.LazyDelegatingGauge so need to invoke getValue() to obtain the actual value + :current => current_data_point.value[0], :average => { # average return a FlowMetric which and we need to invoke getValue to obtain the map with metric details. :lifetime => stats[:batch][:event_count][:average].value["lifetime"] ? stats[:batch][:event_count][:average].value["lifetime"].round : 0 } }, :byte_size => { - :current => current_data_point, + :current => current_data_point.value[1], :average => { :lifetime => stats[:batch][:byte_size][:average].value["lifetime"] ? stats[:batch][:byte_size][:average].value["lifetime"].round : 0 } diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 8454279995..f6e2f47dbd 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -691,20 +691,20 @@ private Optional initOrGetNumberGaugeMetric(final ThreadContext con return Optional.of((NumberGauge) delegatingGauge.getMetric().get()); } - private Optional initOrGetTextGaugeMetric(final ThreadContext context, - final RubySymbol[] subPipelineNamespacePath, - final RubySymbol metricName) { - final IRubyObject collector = this.metric.collector(context); - final IRubyObject fullNamespace = pipelineNamespacedPath(subPipelineNamespacePath); - final IRubyObject retrievedMetric = collector.callMethod(context, "get", new IRubyObject[]{fullNamespace, metricName, context.runtime.newSymbol("gauge")}); - - LazyDelegatingGauge delegatingGauge = retrievedMetric.toJava(LazyDelegatingGauge.class); - if (Objects.isNull(delegatingGauge.getType()) || delegatingGauge.getType() != MetricType.GAUGE_TEXT) { - return Optional.empty(); - } - - return Optional.of((TextGauge) delegatingGauge.getMetric().get()); - } +// private Optional initOrGetTextGaugeMetric(final ThreadContext context, +// final RubySymbol[] subPipelineNamespacePath, +// final RubySymbol metricName) { +// final IRubyObject collector = this.metric.collector(context); +// final IRubyObject fullNamespace = pipelineNamespacedPath(subPipelineNamespacePath); +// final IRubyObject retrievedMetric = collector.callMethod(context, "get", new IRubyObject[]{fullNamespace, metricName, context.runtime.newSymbol("gauge")}); +// +// LazyDelegatingGauge delegatingGauge = retrievedMetric.toJava(LazyDelegatingGauge.class); +// if (Objects.isNull(delegatingGauge.getType()) || delegatingGauge.getType() != MetricType.GAUGE_TEXT) { +// return Optional.empty(); +// } +// +// return Optional.of((TextGauge) delegatingGauge.getMetric().get()); +// } private UptimeMetric initOrGetUptimeMetric(final ThreadContext context, final RubySymbol[] subPipelineNamespacePath, diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java index 688f82ca7a..cb54b74eb9 100644 --- a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java +++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java @@ -10,6 +10,7 @@ import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge; import java.security.SecureRandom; +import java.util.Arrays; import static org.logstash.instrument.metrics.MetricKeys.*; @@ -72,7 +73,8 @@ private void updateBatchSizeMetric(QueueBatch batch) { pipelineMetricBatchCount.increment(); pipelineMetricBatchTotalEvents.increment(batch.filteredSize()); pipelineMetricBatchByteSize.increment(totalSize); - currentBatchDimensions.set(String.format("%d-%d", batch.filteredSize(), totalSize)); +// currentBatchDimensions.set(String.format("%d-%d", batch.filteredSize(), totalSize)); + currentBatchDimensions.set(Arrays.asList(batch.filteredSize(), totalSize)); } catch (IllegalArgumentException e) { LOG.error("Failed to calculate batch byte size for metrics", e); } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java index 1eb7904567..ac4f57ceb7 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java @@ -32,6 +32,7 @@ import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; import org.logstash.instrument.metrics.MetricType; +import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -55,7 +56,7 @@ public class LazyDelegatingGauge extends AbstractMetric implements Gauge public static LazyDelegatingGauge fromRubyBase(final AbstractNamespacedMetricExt metric, final RubySymbol key) { final ThreadContext context = RubyUtil.RUBY.getCurrentContext(); // just initialize an empty gauge - final IRubyObject gauge = metric.gauge(context, key, context.runtime.newString("undefined")); + final IRubyObject gauge = metric.gauge(context, key, context.runtime.newArray(context.runtime.newString("undefined"), context.runtime.newString("undefined"))); // gauge.callMethod(context, "set", context.runtime.newString("zero")); final LazyDelegatingGauge javaGauge; if (LazyDelegatingGauge.class.isAssignableFrom(gauge.getJavaClass())) { From 44b86e80cceeb18a03b1acf104b420c440057009 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 11 Sep 2025 15:18:50 +0200 Subject: [PATCH 03/10] [Test] Added creation og Gauge in the mocking NamespacedMetric test implementation --- .../logstash/instrument/metrics/MockNamespacedMetric.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/logstash-core/src/test/java/org/logstash/instrument/metrics/MockNamespacedMetric.java b/logstash-core/src/test/java/org/logstash/instrument/metrics/MockNamespacedMetric.java index 91ea0e49d0..2f784fa911 100644 --- a/logstash-core/src/test/java/org/logstash/instrument/metrics/MockNamespacedMetric.java +++ b/logstash-core/src/test/java/org/logstash/instrument/metrics/MockNamespacedMetric.java @@ -9,6 +9,7 @@ import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; import org.logstash.instrument.metrics.counter.LongCounter; +import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge; import org.logstash.instrument.metrics.timer.TimerMetric; import java.util.Objects; @@ -36,7 +37,9 @@ public static MockNamespacedMetric create() { @Override protected IRubyObject getGauge(ThreadContext context, IRubyObject key, IRubyObject value) { - return null; + Objects.requireNonNull(key); + requireRubySymbol(key, "key"); + return RubyUtil.toRubyObject(metrics.computeIfAbsent(key.asJavaString(), LazyDelegatingGauge::new)); } @Override From de96c8f17433eb8c15092743db6ac0586bd8e984 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 11 Sep 2025 16:09:17 +0200 Subject: [PATCH 04/10] Fixed NamespacedMetricExt.getGauge to effectively return the Gauge metric instance stored in the metric store. --- .../org/logstash/instrument/metrics/NamespacedMetricExt.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/NamespacedMetricExt.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/NamespacedMetricExt.java index 36a8acb925..388cfa7de7 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/NamespacedMetricExt.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/NamespacedMetricExt.java @@ -76,7 +76,7 @@ protected IRubyObject getCounter(final ThreadContext context, final IRubyObject @Override protected IRubyObject getGauge(final ThreadContext context, final IRubyObject key, final IRubyObject value) { -// return metric.gauge(context, namespaceName, key, value); + metric.gauge(context, namespaceName, key, value); return collector(context).callMethod( context, "get", new IRubyObject[]{namespaceName, key, MetricExt.GAUGE} ); From 00a395b3ccdd6b7d60e52c793ef89c9b6e3418aa Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 11 Sep 2025 17:21:27 +0200 Subject: [PATCH 05/10] [Test] Updated Monitoring API QA test to check also the current byte_size and event_count --- qa/integration/specs/monitoring_api_spec.rb | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/qa/integration/specs/monitoring_api_spec.rb b/qa/integration/specs/monitoring_api_spec.rb index a77c113ca7..4c3e43a824 100644 --- a/qa/integration/specs/monitoring_api_spec.rb +++ b/qa/integration/specs/monitoring_api_spec.rb @@ -269,11 +269,17 @@ expect(batch_stats["event_count"]["average"]["lifetime"]).to be_a_kind_of(Numeric) expect(batch_stats["event_count"]["average"]["lifetime"]).to be > 0 + expect(batch_stats["event_count"]["current"]).not_to be_nil + expect(batch_stats["event_count"]["current"]).to be >= 0 + expect(batch_stats["byte_size"]).not_to be_nil expect(batch_stats["byte_size"]["average"]).not_to be_nil expect(batch_stats["byte_size"]["average"]["lifetime"]).not_to be_nil expect(batch_stats["byte_size"]["average"]["lifetime"]).to be_a_kind_of(Numeric) expect(batch_stats["byte_size"]["average"]["lifetime"]).to be > 0 + + expect(batch_stats["byte_size"]["current"]).not_to be_nil + expect(batch_stats["byte_size"]["current"]).to be >= 0 end end end From 8ab663b2ba1aca148589fc3b4e9167cbf9bafcde Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 24 Sep 2025 09:29:23 +0200 Subject: [PATCH 06/10] Updated node_stats response that verify structure to include also the new 'current'. --- logstash-core/lib/logstash/api/commands/stats.rb | 2 ++ logstash-core/spec/logstash/api/modules/node_stats_spec.rb | 2 ++ .../org/logstash/execution/QueueReadClientBatchMetrics.java | 1 - .../logstash/instrument/metrics/gauge/LazyDelegatingGauge.java | 1 - 4 files changed, 4 insertions(+), 2 deletions(-) diff --git a/logstash-core/lib/logstash/api/commands/stats.rb b/logstash-core/lib/logstash/api/commands/stats.rb index aa10e5431a..7ae782fadf 100644 --- a/logstash-core/lib/logstash/api/commands/stats.rb +++ b/logstash-core/lib/logstash/api/commands/stats.rb @@ -173,6 +173,8 @@ def plugin_stats(stats, plugin_type) end def refine_batch_metrics(stats) + # current is a tuple of [event_count, byte_size] store the reference locally to avoid repeatedly + # reading and retrieve unrelated values current_data_point = stats[:batch][:current] { :event_count => { diff --git a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb index 71468fdd63..136c1349db 100644 --- a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb +++ b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb @@ -150,11 +150,13 @@ }, "batch" => { "event_count" => { + "current" => Numeric, "average" => { "lifetime" => Numeric } }, "byte_size" => { + "current" => Numeric, "average" => { "lifetime" => Numeric } diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java index cb54b74eb9..b7d6dc9c05 100644 --- a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java +++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java @@ -73,7 +73,6 @@ private void updateBatchSizeMetric(QueueBatch batch) { pipelineMetricBatchCount.increment(); pipelineMetricBatchTotalEvents.increment(batch.filteredSize()); pipelineMetricBatchByteSize.increment(totalSize); -// currentBatchDimensions.set(String.format("%d-%d", batch.filteredSize(), totalSize)); currentBatchDimensions.set(Arrays.asList(batch.filteredSize(), totalSize)); } catch (IllegalArgumentException e) { LOG.error("Failed to calculate batch byte size for metrics", e); diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java index ac4f57ceb7..10c10fceef 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java @@ -57,7 +57,6 @@ public static LazyDelegatingGauge fromRubyBase(final AbstractNamespacedMetricExt final ThreadContext context = RubyUtil.RUBY.getCurrentContext(); // just initialize an empty gauge final IRubyObject gauge = metric.gauge(context, key, context.runtime.newArray(context.runtime.newString("undefined"), context.runtime.newString("undefined"))); -// gauge.callMethod(context, "set", context.runtime.newString("zero")); final LazyDelegatingGauge javaGauge; if (LazyDelegatingGauge.class.isAssignableFrom(gauge.getJavaClass())) { javaGauge = gauge.toJava(LazyDelegatingGauge.class); From cc22aa24a1fa1fa899294464064dbedfe47135dd Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 24 Sep 2025 10:03:47 +0200 Subject: [PATCH 07/10] Minor, removed commented code --- .../execution/AbstractPipelineExt.java | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index f6e2f47dbd..5bf98abff1 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -608,14 +608,6 @@ private void initializeBatchMetrics(ThreadContext context) { final FlowMetric byteSizePerBatch = createFlowMetric(BATCH_AVERAGE_KEY, totalBytes, batchCounter); this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, byteSizePerBatch); storeMetric(context, batchSizeNamespace, byteSizePerBatch); - -// initOrGetTextGaugeMetric(context, buildNamespace(BATCH_KEY), BATCH_CURRENT_KEY); -// Optional textGauge = initOrGetTextGaugeMetric(context, buildNamespace(BATCH_KEY), BATCH_CURRENT_KEY); -// if (textGauge.isPresent()) { -// storeMetric(context, batchSizeNamespace, textGauge.get()); -// } else { -// LOGGER.warn("Unable to initialize batch.current gauge, it will not be available"); -// } } private boolean isBatchMetricsEnabled(ThreadContext context) { @@ -691,21 +683,6 @@ private Optional initOrGetNumberGaugeMetric(final ThreadContext con return Optional.of((NumberGauge) delegatingGauge.getMetric().get()); } -// private Optional initOrGetTextGaugeMetric(final ThreadContext context, -// final RubySymbol[] subPipelineNamespacePath, -// final RubySymbol metricName) { -// final IRubyObject collector = this.metric.collector(context); -// final IRubyObject fullNamespace = pipelineNamespacedPath(subPipelineNamespacePath); -// final IRubyObject retrievedMetric = collector.callMethod(context, "get", new IRubyObject[]{fullNamespace, metricName, context.runtime.newSymbol("gauge")}); -// -// LazyDelegatingGauge delegatingGauge = retrievedMetric.toJava(LazyDelegatingGauge.class); -// if (Objects.isNull(delegatingGauge.getType()) || delegatingGauge.getType() != MetricType.GAUGE_TEXT) { -// return Optional.empty(); -// } -// -// return Optional.of((TextGauge) delegatingGauge.getMetric().get()); -// } - private UptimeMetric initOrGetUptimeMetric(final ThreadContext context, final RubySymbol[] subPipelineNamespacePath, final RubySymbol uptimeMetricName) { From ec0891bc1b4788f22e37df7e69097eae0b7eb1c6 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 24 Sep 2025 16:56:17 +0200 Subject: [PATCH 08/10] Fixed current batch metric to effectively show the current value and not the last value --- .../logstash/execution/QueueReadClientBatchMetrics.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java index b7d6dc9c05..eb38f2c9c4 100644 --- a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java +++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java @@ -43,12 +43,13 @@ public void setupMetrics(AbstractNamespacedMetricExt namespacedMetric) { } public void updateBatchMetrics(QueueBatch batch) { - if (batch.events().isEmpty()) { - // avoid to increment batch count for empty batches + if (batchMetricMode == QueueFactoryExt.BatchMetricMode.DISABLED) { return; } - if (batchMetricMode == QueueFactoryExt.BatchMetricMode.DISABLED) { + if (batch.events().isEmpty()) { + // avoid to increment batch count for empty batches + currentBatchDimensions.set(Arrays.asList(0L, 0L)); return; } From 60284afe6e3640d370970d81e5b0088232321a33 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Thu, 25 Sep 2025 15:57:39 +0200 Subject: [PATCH 09/10] Better description when a batch size update is skipped MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- .../org/logstash/execution/QueueReadClientBatchMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java index eb38f2c9c4..8c38051838 100644 --- a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java +++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java @@ -48,7 +48,7 @@ public void updateBatchMetrics(QueueBatch batch) { } if (batch.events().isEmpty()) { - // avoid to increment batch count for empty batches + // don't update averages for empty batches, but set current back to zero currentBatchDimensions.set(Arrays.asList(0L, 0L)); return; } From dc5fe28b9f0459c1682b56b27efdc824911682e6 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 25 Sep 2025 16:12:09 +0200 Subject: [PATCH 10/10] Moved COUNTER and GAUGE constants to be package-private and explained why --- .../logstash/execution/QueueReadClientBatchMetrics.java | 8 ++++---- .../java/org/logstash/instrument/metrics/MetricExt.java | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java index 8c38051838..a91cdf5ded 100644 --- a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java +++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java @@ -67,14 +67,14 @@ public void updateBatchMetrics(QueueBatch batch) { private void updateBatchSizeMetric(QueueBatch batch) { try { // if an error occurs in estimating the size of the batch, no counter has to be updated - long totalSize = 0L; + long totalByteSize = 0L; for (JrubyEventExtLibrary.RubyEvent rubyEvent : batch.events()) { - totalSize += rubyEvent.getEvent().estimateMemory(); + totalByteSize += rubyEvent.getEvent().estimateMemory(); } pipelineMetricBatchCount.increment(); pipelineMetricBatchTotalEvents.increment(batch.filteredSize()); - pipelineMetricBatchByteSize.increment(totalSize); - currentBatchDimensions.set(Arrays.asList(batch.filteredSize(), totalSize)); + pipelineMetricBatchByteSize.increment(totalByteSize); + currentBatchDimensions.set(Arrays.asList(batch.filteredSize(), totalByteSize)); } catch (IllegalArgumentException e) { LOG.error("Failed to calculate batch byte size for metrics", e); } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricExt.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricExt.java index 3b1122a563..f842500e19 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricExt.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricExt.java @@ -44,15 +44,15 @@ public final class MetricExt extends AbstractSimpleMetricExt { private static final long serialVersionUID = 1L; - public static final RubySymbol COUNTER = RubyUtil.RUBY.newSymbol("counter"); + // These two metric type symbols need to be package-private because used in NamespacedMetricExt + static final RubySymbol COUNTER = RubyUtil.RUBY.newSymbol("counter"); + static final RubySymbol GAUGE = RubyUtil.RUBY.newSymbol("gauge"); private static final RubyFixnum ONE = RubyUtil.RUBY.newFixnum(1); private static final RubySymbol INCREMENT = RubyUtil.RUBY.newSymbol("increment"); private static final RubySymbol DECREMENT = RubyUtil.RUBY.newSymbol("decrement"); - - public static final RubySymbol GAUGE = RubyUtil.RUBY.newSymbol("gauge"); private static final RubySymbol TIMER = RubyUtil.RUBY.newSymbol("timer"); private static final RubySymbol SET = RubyUtil.RUBY.newSymbol("set"); private static final RubySymbol GET = RubyUtil.RUBY.newSymbol("get");