diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java index c64b29fd88f..9612ce10dc6 100644 --- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java +++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java @@ -32,7 +32,7 @@ * 4. Expose methods which could be called externally to change the value of metrics */ -public class BoltMetrics implements ComponentMetrics { +public class BoltMetrics extends ComponentMetrics { private final CountMetric ackCount; private final ReducedMetric processLatency; private final ReducedMetric failLatency; diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java index 5503d144def..192924b5d81 100644 --- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java +++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java @@ -13,16 +13,14 @@ // limitations under the License. package com.twitter.heron.common.utils.metrics; -import com.twitter.heron.classification.InterfaceAudience; -import com.twitter.heron.classification.InterfaceStability; - /** - * Interface for common metric actions that both spouts and bolts support + * Abstract Class for common metric actions that both spouts and bolts support */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public interface ComponentMetrics { +public abstract class ComponentMetrics { + // Metric-name suffix reserved for value aggregating on all different streams + public static final String ALL_STREAMS_AGGREGATED = "__all-streams-aggregated"; + + public abstract void serializeDataTuple(String streamId, long latency); - void serializeDataTuple(String streamId, long latency); - void emittedTuple(String streamId); + public abstract void emittedTuple(String streamId); } diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullBoltMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullBoltMetrics.java index ee75a8529d8..3c207d72005 100644 --- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullBoltMetrics.java +++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullBoltMetrics.java @@ -54,7 +54,6 @@ public class FullBoltMetrics extends BoltMetrics { // so instance could not produce more tuples private final CountMetric outQueueFullCount; - public FullBoltMetrics() { ackCount = new MultiCountMetric(); processLatency = new MultiReducedMetric<>(new MeanReducer()); @@ -126,6 +125,8 @@ public void ackedTuple(String streamId, String sourceComponent, long latency) { ackCount.scope(streamId).incr(); processLatency.scope(streamId).update(latency); + ackCount.scope(ALL_STREAMS_AGGREGATED).incr(); + // Consider there are cases that different streams with the same streamId, // but with different source component. We need to distinguish them too. String globalStreamId = @@ -138,6 +139,8 @@ public void failedTuple(String streamId, String sourceComponent, long latency) { failCount.scope(streamId).incr(); failLatency.scope(streamId).update(latency); + failCount.scope(ALL_STREAMS_AGGREGATED).incr(); + // Consider there are cases that different streams with the same streamId, // but with different source component. We need to distinguish them too. String globalStreamId = @@ -151,6 +154,9 @@ public void executeTuple(String streamId, String sourceComponent, long latency) executeLatency.scope(streamId).update(latency); executeTimeNs.scope(streamId).incrBy(latency); + executeCount.scope(ALL_STREAMS_AGGREGATED).incr(); + executeTimeNs.scope(ALL_STREAMS_AGGREGATED).incrBy(latency); + // Consider there are cases that different streams with the same streamId, // but with different source component. We need to distinguish them too. String globalStreamId = @@ -170,6 +176,7 @@ public void updateOutQueueFullCount() { public void deserializeDataTuple(String streamId, String sourceComponent, long latency) { deserializationTimeNs.scope(streamId).incrBy(latency); + deserializationTimeNs.scope(ALL_STREAMS_AGGREGATED).incrBy(latency); // Consider there are cases that different streams with the same streamId, // but with different source component. We need to distinguish them too. @@ -180,6 +187,7 @@ public void deserializeDataTuple(String streamId, String sourceComponent, long l public void serializeDataTuple(String streamId, long latency) { serializationTimeNs.scope(streamId).incrBy(latency); + serializationTimeNs.scope(ALL_STREAMS_AGGREGATED).incrBy(latency); } } diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullSpoutMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullSpoutMetrics.java index a8e176a8433..ea8346e68db 100644 --- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullSpoutMetrics.java +++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullSpoutMetrics.java @@ -110,19 +110,25 @@ public void initMultiCountMetrics(PhysicalPlanHelper helper) { public void ackedTuple(String streamId, long latency) { ackCount.scope(streamId).incr(); completeLatency.scope(streamId).update(latency); + + ackCount.scope(ALL_STREAMS_AGGREGATED).incr(); } public void failedTuple(String streamId, long latency) { failCount.scope(streamId).incr(); failLatency.scope(streamId).update(latency); + + failCount.scope(ALL_STREAMS_AGGREGATED).incr(); } public void timeoutTuple(String streamId) { timeoutCount.scope(streamId).incr(); + timeoutCount.scope(ALL_STREAMS_AGGREGATED).incr(); } public void emittedTuple(String streamId) { emitCount.scope(streamId).incr(); + emitCount.scope(ALL_STREAMS_AGGREGATED).incr(); } public void nextTuple(long latency) { @@ -140,6 +146,7 @@ public void updatePendingTuplesCount(long count) { public void serializeDataTuple(String streamId, long latency) { serializationTimeNs.scope(streamId).incrBy(latency); + serializationTimeNs.scope(ALL_STREAMS_AGGREGATED).incrBy(latency); } } diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java index 0cc3061e18d..7659be5b6a1 100644 --- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java +++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java @@ -33,7 +33,7 @@ * 4. Expose methods which could be called externally to change the value of metrics */ -public class SpoutMetrics implements ComponentMetrics { +public class SpoutMetrics extends ComponentMetrics { private final CountMetric ackCount; private final ReducedMetric completeLatency; private final ReducedMetric failLatency;