From 254f4d6381099240198623c832d03372fc80628f Mon Sep 17 00:00:00 2001 From: dancollins34 <33334385+dancollins34@users.noreply.github.com> Date: Tue, 2 Jan 2018 21:22:20 -0500 Subject: [PATCH 01/13] Add support for custom evictor and trigger policies Add support in both the streamlet and topology apis for the usage of custom eviction policies extending com.twitter.heron.api.windowing.EvictionPolicy and trigger policies extending com.twitter.heron.api.windowing.TriggerPolicy to enable user-defined windowing schemes. --- .../heron/api/bolt/BaseWindowedBolt.java | 27 ++++++++++++++++--- .../heron/api/bolt/WindowedBoltExecutor.java | 18 ++++++++++--- .../heron/api/windowing/WindowingConfigs.java | 22 +++++++++++++++ .../twitter/heron/streamlet/WindowConfig.java | 13 +++++++++ .../streamlet/impl/WindowConfigImpl.java | 16 ++++++++++- 5 files changed, 89 insertions(+), 7 deletions(-) diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java b/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java index adc0e4735b4..8eeb272bc7e 100644 --- a/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java +++ b/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java @@ -38,9 +38,8 @@ import com.twitter.heron.api.topology.OutputFieldsDeclarer; import com.twitter.heron.api.topology.TopologyContext; -import com.twitter.heron.api.windowing.TimestampExtractor; -import com.twitter.heron.api.windowing.TupleFieldTimestampExtractor; -import com.twitter.heron.api.windowing.WindowingConfigs; +import com.twitter.heron.api.tuple.Tuple; +import com.twitter.heron.api.windowing.*; public abstract class BaseWindowedBolt implements IWindowedBolt { private static final long serialVersionUID = 5688213068448231559L; @@ -312,6 +311,28 @@ public BaseWindowedBolt withWatermarkInterval(Duration interval) { return this; } + /** + * Sets a custom eviction policy to use for this bolt + * + * @param evictionPolicy the eviction policy to use + * @return this + */ + public BaseWindowedBolt withCustomEvictor(EvictionPolicy evictionPolicy){ + windowConfiguration.setTopologyBoltsWindowCustomEvictor(evictionPolicy); + return this; + } + + /** + * Sets a custom trigger policy to use for this bolt + * + * @param triggerPolicy the trigger policy to use + * @return this + */ + public BaseWindowedBolt withCustomTrigger(TriggerPolicy triggerPolicy){ + windowConfiguration.setTopologyBoltsWindowCustomTrigger(triggerPolicy); + return this; + } + @Override public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) { diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java index 374f0a7684f..b309b229692 100644 --- a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java +++ b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java @@ -237,9 +237,21 @@ private WindowManager initWindowManager(WindowLifecycleListener // validate validate(topoConf, windowLengthCount, windowLengthDurationMs, slidingIntervalCount, slidingIntervalDurationMs); - evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDurationMs); - triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDurationMs, manager, - evictionPolicy, topoConf); + + if(topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR)){ + evictionPolicy = (EvictionPolicy) topoConf.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR); + }else{ + evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDurationMs); + } + + if(topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER)){ + triggerPolicy = (TriggerPolicy) topoConf.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER); + }else{ + triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDurationMs, manager, + evictionPolicy, topoConf); + } + + manager.setEvictionPolicy(evictionPolicy); manager.setTriggerPolicy(triggerPolicy); // restore state if there is existing state diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java b/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java index f73eb981fa5..647cf4608ae 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java @@ -32,6 +32,8 @@ package com.twitter.heron.api.windowing; +import com.twitter.heron.api.tuple.Tuple; + import java.util.HashMap; import java.util.Map; @@ -90,6 +92,10 @@ public class WindowingConfigs extends HashMap { public static final String TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS = "topology.bolts" + ".watermark.event.interval.ms"; + public static final String TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR = "topology.bolts.window.custom.evictor"; + + public static final String TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER = "topology.bolts.window.custom.trigger"; + public void setTopologyBoltsWindowLengthCount(long value) { setTopologyBoltsWindowLengthCount(this, value); } @@ -147,4 +153,20 @@ public static void setTopologyBoltsWatermarkEventIntervalMs( Map conf, long value) { conf.put(TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, value); } + + public void setTopologyBoltsWindowCustomEvictor(EvictionPolicy value){ + setTopologyBoltsWindowCustomEvictor(this, value); + } + + public static void setTopologyBoltsWindowCustomEvictor(Map conf, EvictionPolicy value){ + conf.put(TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR, value); + } + + public void setTopologyBoltsWindowCustomTrigger(TriggerPolicy value){ + setTopologyBoltsWindowCustomTrigger(this, value); + } + + public static void setTopologyBoltsWindowCustomTrigger(Map conf, TriggerPolicy value){ + conf.put(TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER, value); + } } diff --git a/heron/api/src/java/com/twitter/heron/streamlet/WindowConfig.java b/heron/api/src/java/com/twitter/heron/streamlet/WindowConfig.java index c5986c774f5..2756d4d0af8 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/WindowConfig.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/WindowConfig.java @@ -17,6 +17,9 @@ import java.time.Duration; +import com.twitter.heron.api.tuple.Tuple; +import com.twitter.heron.api.windowing.EvictionPolicy; +import com.twitter.heron.api.windowing.TriggerPolicy; import com.twitter.heron.streamlet.impl.WindowConfigImpl; /** @@ -64,4 +67,14 @@ static WindowConfig TumblingCountWindow(int windowSize) { static WindowConfig SlidingCountWindow(int windowSize, int slideSize) { return new WindowConfigImpl(windowSize, slideSize); } + + /** + * Creates a window based on the provided custom trigger and eviction policies + * @param triggerPolicy The trigger policy to use + * @param evictionPolicy The eviction policy to use + * @return WindowConfig that can be passed to the transformation + */ + static WindowConfig CustomWindow(TriggerPolicy triggerPolicy, EvictionPolicy evictionPolicy){ + return new WindowConfigImpl(triggerPolicy, evictionPolicy); + } } diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/WindowConfigImpl.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/WindowConfigImpl.java index 7721b569487..bec8ab8f085 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/impl/WindowConfigImpl.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/WindowConfigImpl.java @@ -18,18 +18,23 @@ import java.time.Duration; import com.twitter.heron.api.bolt.BaseWindowedBolt; +import com.twitter.heron.api.tuple.Tuple; +import com.twitter.heron.api.windowing.EvictionPolicy; +import com.twitter.heron.api.windowing.TriggerPolicy; import com.twitter.heron.streamlet.WindowConfig; /** * WindowConfigImpl implements the WindowConfig interface. */ public final class WindowConfigImpl implements WindowConfig { - private enum WindowType { TIME, COUNT } + private enum WindowType { TIME, COUNT, CUSTOM } private WindowType windowType; private int windowSize; private int slideInterval; private Duration windowDuration; private Duration slidingIntervalDuration; + private TriggerPolicy triggerPolicy; + private EvictionPolicy evictionPolicy; public WindowConfigImpl(Duration windowDuration, Duration slidingIntervalDuration) { this.windowType = WindowType.TIME; @@ -41,6 +46,11 @@ public WindowConfigImpl(int windowSize, int slideInterval) { this.windowSize = windowSize; this.slideInterval = slideInterval; } + public WindowConfigImpl(TriggerPolicy triggerPolicy, EvictionPolicy evictionPolicy){ + this.windowType = WindowType.CUSTOM; + this.triggerPolicy = triggerPolicy; + this.evictionPolicy = evictionPolicy; + } public void attachWindowConfig(BaseWindowedBolt bolt) { switch(windowType) { @@ -51,6 +61,10 @@ public void attachWindowConfig(BaseWindowedBolt bolt) { case TIME: bolt.withWindow(windowDuration, slidingIntervalDuration); break; + case CUSTOM: + bolt.withCustomEvictor(evictionPolicy); + bolt.withCustomTrigger(triggerPolicy); + break; default: throw new RuntimeException("Unknown windowType " + String.valueOf(windowType)); } From 44da17b4e74844a6d364cf9c4f3810cd0b666baf Mon Sep 17 00:00:00 2001 From: dancollins34 <33334385+dancollins34@users.noreply.github.com> Date: Tue, 2 Jan 2018 23:55:12 -0500 Subject: [PATCH 02/13] Fixed Checkstyle Issues Fixed Checkstyle Issues --- .../heron/api/bolt/BaseWindowedBolt.java | 10 ++++++--- .../heron/api/bolt/WindowedBoltExecutor.java | 14 +++++++------ .../heron/api/windowing/WindowingConfigs.java | 21 ++++++++++++------- .../twitter/heron/streamlet/WindowConfig.java | 3 ++- .../streamlet/impl/WindowConfigImpl.java | 3 ++- 5 files changed, 32 insertions(+), 19 deletions(-) diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java b/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java index 8eeb272bc7e..b242f2eb970 100644 --- a/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java +++ b/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java @@ -39,7 +39,11 @@ import com.twitter.heron.api.topology.OutputFieldsDeclarer; import com.twitter.heron.api.topology.TopologyContext; import com.twitter.heron.api.tuple.Tuple; -import com.twitter.heron.api.windowing.*; +import com.twitter.heron.api.windowing.EvictionPolicy; +import com.twitter.heron.api.windowing.TimestampExtractor; +import com.twitter.heron.api.windowing.TriggerPolicy; +import com.twitter.heron.api.windowing.TupleFieldTimestampExtractor; +import com.twitter.heron.api.windowing.WindowingConfigs; public abstract class BaseWindowedBolt implements IWindowedBolt { private static final long serialVersionUID = 5688213068448231559L; @@ -317,7 +321,7 @@ public BaseWindowedBolt withWatermarkInterval(Duration interval) { * @param evictionPolicy the eviction policy to use * @return this */ - public BaseWindowedBolt withCustomEvictor(EvictionPolicy evictionPolicy){ + public BaseWindowedBolt withCustomEvictor(EvictionPolicy evictionPolicy) { windowConfiguration.setTopologyBoltsWindowCustomEvictor(evictionPolicy); return this; } @@ -328,7 +332,7 @@ public BaseWindowedBolt withCustomEvictor(EvictionPolicy evictionPolic * @param triggerPolicy the trigger policy to use * @return this */ - public BaseWindowedBolt withCustomTrigger(TriggerPolicy triggerPolicy){ + public BaseWindowedBolt withCustomTrigger(TriggerPolicy triggerPolicy) { windowConfiguration.setTopologyBoltsWindowCustomTrigger(triggerPolicy); return this; } diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java index b309b229692..a32f0f52e2b 100644 --- a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java +++ b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java @@ -238,15 +238,17 @@ private WindowManager initWindowManager(WindowLifecycleListener validate(topoConf, windowLengthCount, windowLengthDurationMs, slidingIntervalCount, slidingIntervalDurationMs); - if(topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR)){ - evictionPolicy = (EvictionPolicy) topoConf.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR); - }else{ + if (topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR)) { + evictionPolicy = (EvictionPolicy) + topoConf.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR); + } else { evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDurationMs); } - if(topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER)){ - triggerPolicy = (TriggerPolicy) topoConf.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER); - }else{ + if (topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER)) { + triggerPolicy = (TriggerPolicy) + topoConf.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER); + } else { triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDurationMs, manager, evictionPolicy, topoConf); } diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java b/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java index 647cf4608ae..811f8e891ef 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java @@ -32,11 +32,12 @@ package com.twitter.heron.api.windowing; -import com.twitter.heron.api.tuple.Tuple; - import java.util.HashMap; import java.util.Map; +import com.twitter.heron.api.tuple.Tuple; + + public class WindowingConfigs extends HashMap { private static final long serialVersionUID = 1395902349429869055L; @@ -92,9 +93,11 @@ public class WindowingConfigs extends HashMap { public static final String TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS = "topology.bolts" + ".watermark.event.interval.ms"; - public static final String TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR = "topology.bolts.window.custom.evictor"; + public static final String TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR = + "topology.bolts.window.custom.evictor"; - public static final String TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER = "topology.bolts.window.custom.trigger"; + public static final String TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER = + "topology.bolts.window.custom.trigger"; public void setTopologyBoltsWindowLengthCount(long value) { setTopologyBoltsWindowLengthCount(this, value); @@ -154,19 +157,21 @@ public static void setTopologyBoltsWatermarkEventIntervalMs( conf.put(TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, value); } - public void setTopologyBoltsWindowCustomEvictor(EvictionPolicy value){ + public void setTopologyBoltsWindowCustomEvictor(EvictionPolicy value) { setTopologyBoltsWindowCustomEvictor(this, value); } - public static void setTopologyBoltsWindowCustomEvictor(Map conf, EvictionPolicy value){ + public static void setTopologyBoltsWindowCustomEvictor(Map conf, + EvictionPolicy value) { conf.put(TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR, value); } - public void setTopologyBoltsWindowCustomTrigger(TriggerPolicy value){ + public void setTopologyBoltsWindowCustomTrigger(TriggerPolicy value) { setTopologyBoltsWindowCustomTrigger(this, value); } - public static void setTopologyBoltsWindowCustomTrigger(Map conf, TriggerPolicy value){ + public static void setTopologyBoltsWindowCustomTrigger(Map conf, + TriggerPolicy value) { conf.put(TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER, value); } } diff --git a/heron/api/src/java/com/twitter/heron/streamlet/WindowConfig.java b/heron/api/src/java/com/twitter/heron/streamlet/WindowConfig.java index 2756d4d0af8..ba49d0ca412 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/WindowConfig.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/WindowConfig.java @@ -74,7 +74,8 @@ static WindowConfig SlidingCountWindow(int windowSize, int slideSize) { * @param evictionPolicy The eviction policy to use * @return WindowConfig that can be passed to the transformation */ - static WindowConfig CustomWindow(TriggerPolicy triggerPolicy, EvictionPolicy evictionPolicy){ + static WindowConfig CustomWindow(TriggerPolicy triggerPolicy, + EvictionPolicy evictionPolicy) { return new WindowConfigImpl(triggerPolicy, evictionPolicy); } } diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/WindowConfigImpl.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/WindowConfigImpl.java index bec8ab8f085..56721ac97ab 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/impl/WindowConfigImpl.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/WindowConfigImpl.java @@ -46,7 +46,8 @@ public WindowConfigImpl(int windowSize, int slideInterval) { this.windowSize = windowSize; this.slideInterval = slideInterval; } - public WindowConfigImpl(TriggerPolicy triggerPolicy, EvictionPolicy evictionPolicy){ + public WindowConfigImpl(TriggerPolicy triggerPolicy, + EvictionPolicy evictionPolicy) { this.windowType = WindowType.CUSTOM; this.triggerPolicy = triggerPolicy; this.evictionPolicy = evictionPolicy; From ef1357af89053ed687f97eca68392e06a6d1b6bf Mon Sep 17 00:00:00 2001 From: dancollins34 <33334385+dancollins34@users.noreply.github.com> Date: Thu, 4 Jan 2018 22:18:55 -0500 Subject: [PATCH 03/13] Fixed extra line errors, reorganized if statement in WIndowedBoltExecutor to require both Policies to be provided if one of them is --- .../heron/api/bolt/WindowedBoltExecutor.java | 29 +++++++++++-------- .../heron/api/windowing/WindowingConfigs.java | 1 - 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java index a32f0f52e2b..278ecaacd25 100644 --- a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java +++ b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java @@ -234,26 +234,31 @@ private WindowManager initWindowManager(WindowLifecycleListener "Late tuple stream can be defined only when " + "specifying" + " a timestamp field"); } } - // validate - validate(topoConf, windowLengthCount, windowLengthDurationMs, slidingIntervalCount, - slidingIntervalDurationMs); - if (topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR)) { - evictionPolicy = (EvictionPolicy) - topoConf.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR); - } else { - evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDurationMs); - } + boolean hasCustomTrigger = topoConf + .containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER); + boolean hasCustomEvictor = topoConf + .containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR); - if (topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER)) { + if (hasCustomTrigger && hasCustomEvictor) { triggerPolicy = (TriggerPolicy) topoConf.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER); - } else { + evictionPolicy = (EvictionPolicy) + topoConf.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR); + } else if (!hasCustomEvictor && !hasCustomTrigger) { + // validate + validate(topoConf, windowLengthCount, windowLengthDurationMs, slidingIntervalCount, + slidingIntervalDurationMs); + triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDurationMs, manager, evictionPolicy, topoConf); + evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDurationMs); + } else { + throw new IllegalArgumentException( + "If either a custom TriggerPolicy or EvictionPolicy is defined, both must be." + ); } - manager.setEvictionPolicy(evictionPolicy); manager.setTriggerPolicy(triggerPolicy); // restore state if there is existing state diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java b/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java index 811f8e891ef..cb49d48d8bc 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java @@ -37,7 +37,6 @@ import com.twitter.heron.api.tuple.Tuple; - public class WindowingConfigs extends HashMap { private static final long serialVersionUID = 1395902349429869055L; From 3184617a812bba73e11f917a07307a6617e23fea Mon Sep 17 00:00:00 2001 From: dancollins34 <33334385+dancollins34@users.noreply.github.com> Date: Fri, 5 Jan 2018 00:03:53 -0500 Subject: [PATCH 04/13] Reversed policy order to fix null pointer exception --- .../java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java index 278ecaacd25..b97902c6388 100644 --- a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java +++ b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java @@ -250,9 +250,9 @@ private WindowManager initWindowManager(WindowLifecycleListener validate(topoConf, windowLengthCount, windowLengthDurationMs, slidingIntervalCount, slidingIntervalDurationMs); + evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDurationMs); triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDurationMs, manager, evictionPolicy, topoConf); - evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDurationMs); } else { throw new IllegalArgumentException( "If either a custom TriggerPolicy or EvictionPolicy is defined, both must be." From 6a50ee7e15f54ee5e2ec12afdbde2a7b3eea106b Mon Sep 17 00:00:00 2001 From: dancollins34 <33334385+dancollins34@users.noreply.github.com> Date: Sun, 7 Jan 2018 17:22:57 -0500 Subject: [PATCH 05/13] Trial push (local compilation not working for some reason, going to push then pull clean copy) --- .../heron/api/bolt/WindowedBoltExecutor.java | 22 +++--- .../heron/api/windowing/TriggerPolicy.java | 30 +++++++- .../triggers/AbstractBaseTriggerPolicy.java | 75 +++++++++++++++++++ .../triggers/CountTriggerPolicy.java | 17 +---- .../windowing/triggers/TimeTriggerPolicy.java | 20 +---- .../triggers/WatermarkCountTriggerPolicy.java | 23 +----- .../triggers/WatermarkTimeTriggerPolicy.java | 23 +----- 7 files changed, 127 insertions(+), 83 deletions(-) create mode 100644 heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java index b97902c6388..53b13a18f89 100644 --- a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java +++ b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java @@ -251,14 +251,18 @@ private WindowManager initWindowManager(WindowLifecycleListener slidingIntervalDurationMs); evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDurationMs); - triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDurationMs, manager, - evictionPolicy, topoConf); + triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDurationMs); } else { throw new IllegalArgumentException( "If either a custom TriggerPolicy or EvictionPolicy is defined, both must be." ); } + triggerPolicy.setEvictionPolicy(evictionPolicy); + triggerPolicy.setTopologyConfig(topoConf); + triggerPolicy.setTriggerHandler(manager); + triggerPolicy.setWindowManager(manager); + manager.setEvictionPolicy(evictionPolicy); manager.setTriggerPolicy(triggerPolicy); // restore state if there is existing state @@ -305,22 +309,18 @@ private boolean isTupleTs() { @SuppressWarnings("HiddenField") private TriggerPolicy getTriggerPolicy(Count slidingIntervalCount, Long - slidingIntervalDurationMs, WindowManager manager, EvictionPolicy - evictionPolicy, Map topoConf) { + slidingIntervalDurationMs) { if (slidingIntervalCount != null) { if (isTupleTs()) { - return new WatermarkCountTriggerPolicy<>(slidingIntervalCount.value, manager, - evictionPolicy, manager); + return new WatermarkCountTriggerPolicy<>(slidingIntervalCount.value); } else { - return new CountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy); + return new CountTriggerPolicy<>(slidingIntervalCount.value); } } else { if (isTupleTs()) { - return new WatermarkTimeTriggerPolicy<>(slidingIntervalDurationMs, manager, - evictionPolicy, manager); + return new WatermarkTimeTriggerPolicy<>(slidingIntervalDurationMs); } else { - return new TimeTriggerPolicy<>(slidingIntervalDurationMs, manager, - evictionPolicy, topoConf); + return new TimeTriggerPolicy<>(slidingIntervalDurationMs); } } } diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/TriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/TriggerPolicy.java index 5fe6a0055a7..4043f003a0d 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/TriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/TriggerPolicy.java @@ -30,10 +30,10 @@ * limitations under the License. */ - package com.twitter.heron.api.windowing; import java.io.Serializable; +import java.util.Map; /** * Triggers the window calculations based on the policy. @@ -80,4 +80,32 @@ public interface TriggerPolicy { * @param state the state */ void restoreState(S state); + + /** + * Set the eviction policy to whatever eviction policy to use this with + * + * @param evictionPolicy the eviction policy + */ + void setEvictionPolicy(EvictionPolicy evictionPolicy); + + /** + * Set the trigger handler for this trigger policy to trigger + * + * @param triggerHandler the trigger handler + */ + void setTriggerHandler(TriggerHandler triggerHandler); + + /** + * Sets the window manager that uses this trigger policy + * + * @param windowManager the window manager + */ + void setWindowManager(WindowManager windowManager); + + /** + * Sets the Config used for this topology + * + * @param config the configuration policy + */ + void setTopologyConfig(Map config); } diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java new file mode 100644 index 00000000000..1584027ae4c --- /dev/null +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java @@ -0,0 +1,75 @@ +package com.twitter.heron.api.windowing.triggers; + +import java.io.Serializable; +import java.util.Map; + +import com.twitter.heron.api.windowing.EvictionPolicy; +import com.twitter.heron.api.windowing.TriggerHandler; +import com.twitter.heron.api.windowing.TriggerPolicy; +import com.twitter.heron.api.windowing.WindowManager; + +public abstract class AbstractBaseTriggerPolicy + implements TriggerPolicy { + protected TriggerHandler handler; + protected EvictionPolicy evictionPolicy; + protected WindowManager windowManager; + protected Boolean started; + protected Map topoConf; + + /** + * Set the eviction policy to whatever eviction policy to use this with + * + * @param evictionPolicy the eviction policy + */ + public void setEvictionPolicy(EvictionPolicy evictionPolicy){ + this.evictionPolicy = evictionPolicy; + } + + /** + * Set the trigger handler for this trigger policy to trigger + * + * @param triggerHandler the trigger handler + */ + public void setTriggerHandler(TriggerHandler triggerHandler){ + this.handler = triggerHandler; + } + + /** + * Sets the window manager that uses this trigger policy + * + * @param windowManager the window manager + */ + public void setWindowManager(WindowManager windowManager){ + this.windowManager = windowManager; + } + + /** + * Sets the Config used for this topology + * + * @param config the configuration object + */ + public void setTopologyConfig(Map config){ + this.topoConf = config; + } + + /** + * Starts the trigger policy. This can be used + * during recovery to start the triggers after + * recovery is complete. + */ + public void start(){ + if(this.evictionPolicy == null){ + throw new RuntimeException("EvictionPolicy of TriggerPolicy was not set."); + } + + if(this.handler == null){ + throw new RuntimeException("TriggerHandler of TriggerPolicy was not set."); + } + + if(this.windowManager == null){ + throw new RuntimeException("WindowManager of TriggerPolicy was not set."); + } + + started = true; + } +} diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java index 9fb679133be..110dbb0be7d 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java @@ -37,9 +37,7 @@ import com.twitter.heron.api.windowing.DefaultEvictionContext; import com.twitter.heron.api.windowing.Event; -import com.twitter.heron.api.windowing.EvictionPolicy; import com.twitter.heron.api.windowing.TriggerHandler; -import com.twitter.heron.api.windowing.TriggerPolicy; /** * A trigger that tracks event counts and calls back {@link TriggerHandler#onTrigger()} @@ -47,19 +45,15 @@ * * @param the type of event tracked by this policy. */ -public class CountTriggerPolicy implements TriggerPolicy { +public class CountTriggerPolicy extends + AbstractBaseTriggerPolicy { private final int count; private final AtomicInteger currentCount; - private final TriggerHandler handler; - private final EvictionPolicy evictionPolicy; private boolean started; - public CountTriggerPolicy(int count, TriggerHandler handler, EvictionPolicy - evictionPolicy) { + public CountTriggerPolicy(int count) { this.count = count; this.currentCount = new AtomicInteger(); - this.handler = handler; - this.evictionPolicy = evictionPolicy; this.started = false; } @@ -78,11 +72,6 @@ public void reset() { currentCount.set(0); } - @Override - public void start() { - started = true; - } - @Override public void shutdown() { // NOOP diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java index e07b3284bf8..1c5002ff423 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java @@ -39,32 +39,17 @@ import com.twitter.heron.api.Config; import com.twitter.heron.api.windowing.DefaultEvictionContext; import com.twitter.heron.api.windowing.Event; -import com.twitter.heron.api.windowing.EvictionPolicy; import com.twitter.heron.api.windowing.TriggerHandler; -import com.twitter.heron.api.windowing.TriggerPolicy; /** * Invokes {@link TriggerHandler#onTrigger()} after the duration. */ -public class TimeTriggerPolicy implements TriggerPolicy { - +public class TimeTriggerPolicy extends AbstractBaseTriggerPolicy { private long duration; - private final TriggerHandler handler; - private final EvictionPolicy evictionPolicy; - private Map topoConf; - - - public TimeTriggerPolicy(long millis, TriggerHandler handler) { - this(millis, handler, null, new Config()); - } - public TimeTriggerPolicy(long millis, TriggerHandler handler, EvictionPolicy - evictionPolicy, Map topoConf) { + public TimeTriggerPolicy(long millis) { this.duration = millis; - this.handler = handler; - this.evictionPolicy = evictionPolicy; - this.topoConf = topoConf; } @Override @@ -79,6 +64,7 @@ public void reset() { @Override public void start() { + super.start(); Config.registerTopologyTimerEvents(this.topoConf, "TimeTriggerPolicyTimer", Duration.ofMillis(this.duration), () -> triggerTask()); } diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkCountTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkCountTriggerPolicy.java index 6c7a59dd771..98e252fbfb6 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkCountTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkCountTriggerPolicy.java @@ -37,10 +37,6 @@ import com.twitter.heron.api.windowing.DefaultEvictionContext; import com.twitter.heron.api.windowing.Event; -import com.twitter.heron.api.windowing.EvictionPolicy; -import com.twitter.heron.api.windowing.TriggerHandler; -import com.twitter.heron.api.windowing.TriggerPolicy; -import com.twitter.heron.api.windowing.WindowManager; /** * A trigger policy that tracks event counts and sets the context for @@ -48,21 +44,13 @@ * * @param the type of event tracked by this policy. */ -public class WatermarkCountTriggerPolicy implements TriggerPolicy { +public class WatermarkCountTriggerPolicy extends + AbstractBaseTriggerPolicy { private final int count; - private final TriggerHandler handler; - private final EvictionPolicy evictionPolicy; - private final WindowManager windowManager; private volatile long lastProcessedTs; - private boolean started; - public WatermarkCountTriggerPolicy(int count, TriggerHandler handler, EvictionPolicy - evictionPolicy, WindowManager windowManager) { + public WatermarkCountTriggerPolicy(int count) { this.count = count; - this.handler = handler; - this.evictionPolicy = evictionPolicy; - this.windowManager = windowManager; - this.started = false; } @Override @@ -77,11 +65,6 @@ public void reset() { // NOOP } - @Override - public void start() { - started = true; - } - @Override public void shutdown() { // NOOP diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkTimeTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkTimeTriggerPolicy.java index 5daba8b2e28..7bd4029a6a0 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkTimeTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkTimeTriggerPolicy.java @@ -37,32 +37,20 @@ import com.twitter.heron.api.windowing.DefaultEvictionContext; import com.twitter.heron.api.windowing.Event; -import com.twitter.heron.api.windowing.EvictionPolicy; import com.twitter.heron.api.windowing.TriggerHandler; -import com.twitter.heron.api.windowing.TriggerPolicy; -import com.twitter.heron.api.windowing.WindowManager; /** * Handles watermark events and triggers {@link TriggerHandler#onTrigger()} for each window * interval that has events to be processed up to the watermark ts. */ -public class WatermarkTimeTriggerPolicy implements TriggerPolicy { +public class WatermarkTimeTriggerPolicy extends + AbstractBaseTriggerPolicy { private static final Logger LOG = Logger.getLogger(WatermarkTimeTriggerPolicy.class.getName()); private final long slidingIntervalMs; - private final TriggerHandler handler; - private final EvictionPolicy evictionPolicy; - private final WindowManager windowManager; private volatile long nextWindowEndTs; - private boolean started; - public WatermarkTimeTriggerPolicy(long slidingIntervalMs, TriggerHandler handler, - EvictionPolicy evictionPolicy, WindowManager - windowManager) { + public WatermarkTimeTriggerPolicy(long slidingIntervalMs) { this.slidingIntervalMs = slidingIntervalMs; - this.handler = handler; - this.evictionPolicy = evictionPolicy; - this.windowManager = windowManager; - this.started = false; } @Override @@ -77,11 +65,6 @@ public void reset() { // NOOP } - @Override - public void start() { - started = true; - } - @Override public void shutdown() { // NOOP From 1ef4279629e100baceb9b84fd31ba4c8c6661a27 Mon Sep 17 00:00:00 2001 From: dancollins34 <33334385+dancollins34@users.noreply.github.com> Date: Sun, 7 Jan 2018 23:15:23 -0500 Subject: [PATCH 06/13] Added Changes necessary to allow custom trigger policies to attach WindowManager, Config object, TriggerHandler and EvictionPolicy --- .../triggers/AbstractBaseTriggerPolicy.java | 162 ++++++++++++------ .../triggers/CountTriggerPolicy.java | 2 + .../windowing/triggers/TimeTriggerPolicy.java | 3 +- .../triggers/WatermarkCountTriggerPolicy.java | 1 + .../triggers/WatermarkTimeTriggerPolicy.java | 1 + .../api/windowing/WindowManagerTest.java | 69 +++++--- 6 files changed, 159 insertions(+), 79 deletions(-) diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java index 1584027ae4c..3f6300ca338 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java @@ -1,3 +1,35 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.twitter.heron.api.windowing.triggers; import java.io.Serializable; @@ -8,68 +40,88 @@ import com.twitter.heron.api.windowing.TriggerPolicy; import com.twitter.heron.api.windowing.WindowManager; + public abstract class AbstractBaseTriggerPolicy - implements TriggerPolicy { - protected TriggerHandler handler; - protected EvictionPolicy evictionPolicy; - protected WindowManager windowManager; - protected Boolean started; - protected Map topoConf; - - /** - * Set the eviction policy to whatever eviction policy to use this with - * - * @param evictionPolicy the eviction policy - */ - public void setEvictionPolicy(EvictionPolicy evictionPolicy){ - this.evictionPolicy = evictionPolicy; - } + implements TriggerPolicy { + protected TriggerHandler handler; + protected EvictionPolicy evictionPolicy; + protected WindowManager windowManager; + protected Boolean started; + protected Map topoConf; + + private boolean requiresEvictionPolicy = false; + private boolean requiresWindowManager = false; + private boolean requiresTopologyConfig = false; + + /** + * Set the requirements in the constructor + */ + public AbstractBaseTriggerPolicy(boolean requiresEvictionPolicy, + boolean requiresWindowManager, + boolean requiresTopologyConfig) { + this.requiresEvictionPolicy = requiresEvictionPolicy; + this.requiresWindowManager = requiresWindowManager; + this.requiresTopologyConfig = requiresTopologyConfig; + } - /** - * Set the trigger handler for this trigger policy to trigger - * - * @param triggerHandler the trigger handler - */ - public void setTriggerHandler(TriggerHandler triggerHandler){ - this.handler = triggerHandler; + /** + * Set the eviction policy to whatever eviction policy to use this with + * + * @param evictionPolicy the eviction policy + */ + public void setEvictionPolicy(EvictionPolicy evictionPolicy) { + this.evictionPolicy = evictionPolicy; + } + + /** + * Set the trigger handler for this trigger policy to trigger + * + * @param triggerHandler the trigger handler + */ + public void setTriggerHandler(TriggerHandler triggerHandler) { + this.handler = triggerHandler; + } + + /** + * Sets the window manager that uses this trigger policy + * + * @param windowManager the window manager + */ + public void setWindowManager(WindowManager windowManager) { + this.windowManager = windowManager; + } + + /** + * Sets the Config used for this topology + * + * @param config the configuration object + */ + public void setTopologyConfig(Map config) { + this.topoConf = config; + } + + /** + * Starts the trigger policy. This can be used + * during recovery to start the triggers after + * recovery is complete. + */ + public void start() { + if (this.evictionPolicy == null && this.requiresEvictionPolicy) { + throw new RuntimeException("EvictionPolicy of TriggerPolicy was not set."); } - /** - * Sets the window manager that uses this trigger policy - * - * @param windowManager the window manager - */ - public void setWindowManager(WindowManager windowManager){ - this.windowManager = windowManager; + if (this.handler == null) { + throw new RuntimeException("TriggerHandler of TriggerPolicy was not set."); } - /** - * Sets the Config used for this topology - * - * @param config the configuration object - */ - public void setTopologyConfig(Map config){ - this.topoConf = config; + if (this.windowManager == null && this.requiresWindowManager) { + throw new RuntimeException("WindowManager of TriggerPolicy was not set."); } - /** - * Starts the trigger policy. This can be used - * during recovery to start the triggers after - * recovery is complete. - */ - public void start(){ - if(this.evictionPolicy == null){ - throw new RuntimeException("EvictionPolicy of TriggerPolicy was not set."); - } - - if(this.handler == null){ - throw new RuntimeException("TriggerHandler of TriggerPolicy was not set."); - } - - if(this.windowManager == null){ - throw new RuntimeException("WindowManager of TriggerPolicy was not set."); - } - - started = true; + if (this.topoConf == null && this.requiresTopologyConfig) { + throw new RuntimeException("WindowManager of TriggerPolicy was not set."); } + + started = true; + } } diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java index 110dbb0be7d..bc01ed6c790 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java @@ -52,6 +52,8 @@ public class CountTriggerPolicy extends private boolean started; public CountTriggerPolicy(int count) { + super(true, false, false); + this.count = count; this.currentCount = new AtomicInteger(); this.started = false; diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java index 1c5002ff423..e2d823f8819 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java @@ -34,7 +34,6 @@ import java.io.Serializable; import java.time.Duration; -import java.util.Map; import com.twitter.heron.api.Config; import com.twitter.heron.api.windowing.DefaultEvictionContext; @@ -49,6 +48,8 @@ public class TimeTriggerPolicy extends AbstractBaseTrigg private long duration; public TimeTriggerPolicy(long millis) { + super(true, false, true); + this.duration = millis; } diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkCountTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkCountTriggerPolicy.java index 98e252fbfb6..9964a1dce39 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkCountTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkCountTriggerPolicy.java @@ -50,6 +50,7 @@ public class WatermarkCountTriggerPolicy extends private volatile long lastProcessedTs; public WatermarkCountTriggerPolicy(int count) { + super(true, true, false); this.count = count; } diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkTimeTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkTimeTriggerPolicy.java index 7bd4029a6a0..24347e3a192 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkTimeTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkTimeTriggerPolicy.java @@ -50,6 +50,7 @@ public class WatermarkTimeTriggerPolicy extends private volatile long nextWindowEndTs; public WatermarkTimeTriggerPolicy(long slidingIntervalMs) { + super(true, true, false); this.slidingIntervalMs = slidingIntervalMs; } diff --git a/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java b/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java index 5c6be6eea53..5141079cf10 100644 --- a/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java +++ b/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java @@ -102,8 +102,9 @@ public void tearDown() { @Test public void testCountBasedWindow() throws Exception { EvictionPolicy evictionPolicy = new CountEvictionPolicy(5); - TriggerPolicy triggerPolicy = new CountTriggerPolicy(2, windowManager, - evictionPolicy); + TriggerPolicy triggerPolicy = new CountTriggerPolicy(2); + triggerPolicy.setTriggerHandler(windowManager); + triggerPolicy.setEvictionPolicy(evictionPolicy); triggerPolicy.start(); windowManager.setEvictionPolicy(evictionPolicy); windowManager.setTriggerPolicy(triggerPolicy); @@ -146,7 +147,8 @@ public void testExpireThreshold() throws Exception { int windowLength = 5; windowManager.setEvictionPolicy(new CountEvictionPolicy(5)); TriggerPolicy triggerPolicy = new TimeTriggerPolicy(Duration.ofHours(1) - .toMillis(), windowManager); + .toMillis()); + triggerPolicy.setTriggerHandler(windowManager); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); for (int i : seq(1, 5)) { @@ -178,8 +180,10 @@ private void testEvictBeforeWatermarkForWatermarkEvictionPolicy(EvictionPolicy */ int threshold = WindowManager.EXPIRE_EVENTS_THRESHOLD; windowManager.setEvictionPolicy(watermarkEvictionPolicy); - WatermarkCountTriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(windowLength, - windowManager, watermarkEvictionPolicy, windowManager); + WatermarkCountTriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(windowLength); + triggerPolicy.setTriggerHandler(windowManager); + triggerPolicy.setEvictionPolicy(watermarkEvictionPolicy); + triggerPolicy.setWindowManager(windowManager); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); for (int i : seq(1, threshold)) { @@ -226,7 +230,10 @@ public void testTimeBasedWindow() throws Exception { * Set it to a large value and trigger manually. */ TriggerPolicy triggerPolicy = new TimeTriggerPolicy(Duration.ofDays(1) - .toMillis(), windowManager, evictionPolicy, new Config()); + .toMillis()); + triggerPolicy.setTriggerHandler(windowManager); + triggerPolicy.setEvictionPolicy(evictionPolicy); + triggerPolicy.setTopologyConfig(new Config()); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); long now = System.currentTimeMillis(); @@ -294,7 +301,8 @@ public void testTimeBasedWindowExpiry() throws Exception { * Set it to a large value and trigger manually. */ TriggerPolicy triggerPolicy = new TimeTriggerPolicy(Duration.ofDays(1) - .toMillis(), windowManager); + .toMillis()); + triggerPolicy.setTriggerHandler(windowManager); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); long now = System.currentTimeMillis(); @@ -327,8 +335,9 @@ public void testTimeBasedWindowExpiry() throws Exception { public void testTumblingWindow() throws Exception { EvictionPolicy evictionPolicy = new CountEvictionPolicy(3); windowManager.setEvictionPolicy(evictionPolicy); - TriggerPolicy triggerPolicy = new CountTriggerPolicy(3, windowManager, - evictionPolicy); + TriggerPolicy triggerPolicy = new CountTriggerPolicy(3); + triggerPolicy.setTriggerHandler(windowManager); + triggerPolicy.setEvictionPolicy(evictionPolicy); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); windowManager.add(1); @@ -358,8 +367,10 @@ public void testTumblingWindow() throws Exception { public void testEventTimeBasedWindow() throws Exception { EvictionPolicy evictionPolicy = new WatermarkTimeEvictionPolicy<>(20); windowManager.setEvictionPolicy(evictionPolicy); - TriggerPolicy triggerPolicy = new WatermarkTimeTriggerPolicy(10, - windowManager, evictionPolicy, windowManager); + TriggerPolicy triggerPolicy = new WatermarkTimeTriggerPolicy(10); + triggerPolicy.setTriggerHandler(windowManager); + triggerPolicy.setEvictionPolicy(evictionPolicy); + triggerPolicy.setWindowManager(windowManager); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); @@ -425,8 +436,10 @@ public void testEventTimeBasedWindow() throws Exception { public void testCountBasedWindowWithEventTs() throws Exception { EvictionPolicy evictionPolicy = new WatermarkCountEvictionPolicy<>(3); windowManager.setEvictionPolicy(evictionPolicy); - TriggerPolicy triggerPolicy = new WatermarkTimeTriggerPolicy(10, - windowManager, evictionPolicy, windowManager); + TriggerPolicy triggerPolicy = new WatermarkTimeTriggerPolicy(10); + triggerPolicy.setTriggerHandler(windowManager); + triggerPolicy.setEvictionPolicy(evictionPolicy); + triggerPolicy.setWindowManager(windowManager); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); @@ -465,8 +478,10 @@ public void testCountBasedWindowWithEventTs() throws Exception { public void testCountBasedTriggerWithEventTs() throws Exception { EvictionPolicy evictionPolicy = new WatermarkTimeEvictionPolicy(20); windowManager.setEvictionPolicy(evictionPolicy); - TriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(3, - windowManager, evictionPolicy, windowManager); + TriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(3); + triggerPolicy.setTriggerHandler(windowManager); + triggerPolicy.setEvictionPolicy(evictionPolicy); + triggerPolicy.setWindowManager(windowManager); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); @@ -506,8 +521,10 @@ public void testCountBasedTriggerWithEventTs() throws Exception { public void testCountBasedTumblingWithSameEventTs() throws Exception { EvictionPolicy evictionPolicy = new WatermarkCountEvictionPolicy<>(2); windowManager.setEvictionPolicy(evictionPolicy); - TriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(2, - windowManager, evictionPolicy, windowManager); + TriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(2); + triggerPolicy.setTriggerHandler(windowManager); + triggerPolicy.setEvictionPolicy(evictionPolicy); + triggerPolicy.setWindowManager(windowManager); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); @@ -535,8 +552,10 @@ public void testCountBasedTumblingWithSameEventTs() throws Exception { public void testCountBasedSlidingWithSameEventTs() throws Exception { EvictionPolicy evictionPolicy = new WatermarkCountEvictionPolicy<>(5); windowManager.setEvictionPolicy(evictionPolicy); - TriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(2, - windowManager, evictionPolicy, windowManager); + TriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(2); + triggerPolicy.setTriggerHandler(windowManager); + triggerPolicy.setEvictionPolicy(evictionPolicy); + triggerPolicy.setWindowManager(windowManager); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); @@ -565,8 +584,10 @@ public void testCountBasedSlidingWithSameEventTs() throws Exception { public void testEventTimeLag() throws Exception { EvictionPolicy evictionPolicy = new WatermarkTimeEvictionPolicy<>(20, 5); windowManager.setEvictionPolicy(evictionPolicy); - TriggerPolicy triggerPolicy = new WatermarkTimeTriggerPolicy(10, - windowManager, evictionPolicy, windowManager); + TriggerPolicy triggerPolicy = new WatermarkTimeTriggerPolicy(10); + triggerPolicy.setTriggerHandler(windowManager); + triggerPolicy.setEvictionPolicy(evictionPolicy); + triggerPolicy.setWindowManager(windowManager); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); @@ -601,8 +622,10 @@ public Action evict(Event event) { }; windowManager.setEvictionPolicy(evictionPolicy); - TriggerPolicy triggerPolicy = new WatermarkTimeTriggerPolicy(10, - windowManager, evictionPolicy, windowManager); + TriggerPolicy triggerPolicy = new WatermarkTimeTriggerPolicy(10); + triggerPolicy.setTriggerHandler(windowManager); + triggerPolicy.setEvictionPolicy(evictionPolicy); + triggerPolicy.setWindowManager(windowManager); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); From 5431c76cce6a13daedf8da3f36e93c73785a76a9 Mon Sep 17 00:00:00 2001 From: dancollins34 <33334385+dancollins34@users.noreply.github.com> Date: Thu, 11 Jan 2018 02:30:57 -0500 Subject: [PATCH 07/13] Fixed test errors due to lack of setting necessary paramters and defining "started" again for CountTriggerPolicy --- .../api/windowing/triggers/AbstractBaseTriggerPolicy.java | 1 + .../heron/api/windowing/triggers/CountTriggerPolicy.java | 2 -- .../com/twitter/heron/api/windowing/WindowManagerTest.java | 7 ++++++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java index 3f6300ca338..d3ead04bc37 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java @@ -105,6 +105,7 @@ public void setTopologyConfig(Map config) { * during recovery to start the triggers after * recovery is complete. */ + @Override public void start() { if (this.evictionPolicy == null && this.requiresEvictionPolicy) { throw new RuntimeException("EvictionPolicy of TriggerPolicy was not set."); diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java index bc01ed6c790..d729bc6b5aa 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java @@ -49,14 +49,12 @@ public class CountTriggerPolicy extends AbstractBaseTriggerPolicy { private final int count; private final AtomicInteger currentCount; - private boolean started; public CountTriggerPolicy(int count) { super(true, false, false); this.count = count; this.currentCount = new AtomicInteger(); - this.started = false; } @Override diff --git a/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java b/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java index 5141079cf10..46bbc23108d 100644 --- a/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java +++ b/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java @@ -145,10 +145,13 @@ public void testCountBasedWindow() throws Exception { public void testExpireThreshold() throws Exception { int threshold = WindowManager.EXPIRE_EVENTS_THRESHOLD; int windowLength = 5; - windowManager.setEvictionPolicy(new CountEvictionPolicy(5)); + CountEvictionPolicy countEvictionPolicy = new CountEvictionPolicy(5); + windowManager.setEvictionPolicy(countEvictionPolicy); TriggerPolicy triggerPolicy = new TimeTriggerPolicy(Duration.ofHours(1) .toMillis()); + triggerPolicy.setEvictionPolicy(countEvictionPolicy); triggerPolicy.setTriggerHandler(windowManager); + triggerPolicy.setTopologyConfig(new Config()); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); for (int i : seq(1, 5)) { @@ -303,6 +306,8 @@ public void testTimeBasedWindowExpiry() throws Exception { TriggerPolicy triggerPolicy = new TimeTriggerPolicy(Duration.ofDays(1) .toMillis()); triggerPolicy.setTriggerHandler(windowManager); + triggerPolicy.setEvictionPolicy(evictionPolicy); + triggerPolicy.setTopologyConfig(new Config()); triggerPolicy.start(); windowManager.setTriggerPolicy(triggerPolicy); long now = System.currentTimeMillis(); From 0098110b33530610881afc9d50cc53c82d381c9f Mon Sep 17 00:00:00 2001 From: dancollins34 <33334385+dancollins34@users.noreply.github.com> Date: Wed, 17 Jan 2018 21:09:32 -0500 Subject: [PATCH 08/13] Remove flags and checks on AbstractBaseTriggerPolicy --- .../triggers/AbstractBaseTriggerPolicy.java | 23 +------------------ .../triggers/CountTriggerPolicy.java | 2 +- .../windowing/triggers/TimeTriggerPolicy.java | 2 +- .../triggers/WatermarkCountTriggerPolicy.java | 2 +- .../triggers/WatermarkTimeTriggerPolicy.java | 2 +- 5 files changed, 5 insertions(+), 26 deletions(-) diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java index d3ead04bc37..3cc1f599195 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/AbstractBaseTriggerPolicy.java @@ -56,12 +56,7 @@ public abstract class AbstractBaseTriggerPolicy /** * Set the requirements in the constructor */ - public AbstractBaseTriggerPolicy(boolean requiresEvictionPolicy, - boolean requiresWindowManager, - boolean requiresTopologyConfig) { - this.requiresEvictionPolicy = requiresEvictionPolicy; - this.requiresWindowManager = requiresWindowManager; - this.requiresTopologyConfig = requiresTopologyConfig; + public AbstractBaseTriggerPolicy() { } /** @@ -107,22 +102,6 @@ public void setTopologyConfig(Map config) { */ @Override public void start() { - if (this.evictionPolicy == null && this.requiresEvictionPolicy) { - throw new RuntimeException("EvictionPolicy of TriggerPolicy was not set."); - } - - if (this.handler == null) { - throw new RuntimeException("TriggerHandler of TriggerPolicy was not set."); - } - - if (this.windowManager == null && this.requiresWindowManager) { - throw new RuntimeException("WindowManager of TriggerPolicy was not set."); - } - - if (this.topoConf == null && this.requiresTopologyConfig) { - throw new RuntimeException("WindowManager of TriggerPolicy was not set."); - } - started = true; } } diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java index d729bc6b5aa..eed03d89d40 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java @@ -51,7 +51,7 @@ public class CountTriggerPolicy extends private final AtomicInteger currentCount; public CountTriggerPolicy(int count) { - super(true, false, false); + super(); this.count = count; this.currentCount = new AtomicInteger(); diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java index e2d823f8819..89d100c7097 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java @@ -48,7 +48,7 @@ public class TimeTriggerPolicy extends AbstractBaseTrigg private long duration; public TimeTriggerPolicy(long millis) { - super(true, false, true); + super(); this.duration = millis; } diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkCountTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkCountTriggerPolicy.java index 9964a1dce39..c6c0e48a12f 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkCountTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkCountTriggerPolicy.java @@ -50,7 +50,7 @@ public class WatermarkCountTriggerPolicy extends private volatile long lastProcessedTs; public WatermarkCountTriggerPolicy(int count) { - super(true, true, false); + super(); this.count = count; } diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkTimeTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkTimeTriggerPolicy.java index 24347e3a192..acf50cc96e9 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkTimeTriggerPolicy.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/WatermarkTimeTriggerPolicy.java @@ -50,7 +50,7 @@ public class WatermarkTimeTriggerPolicy extends private volatile long nextWindowEndTs; public WatermarkTimeTriggerPolicy(long slidingIntervalMs) { - super(true, true, false); + super(); this.slidingIntervalMs = slidingIntervalMs; } From 1d15f2db4944326f2e8430c4ae9fa9fdc9b2d145 Mon Sep 17 00:00:00 2001 From: dancollins34 Date: Sat, 3 Feb 2018 15:40:36 -0500 Subject: [PATCH 09/13] Add the ability to simulate streamlet topologies --- heron/api/src/java/BUILD | 7 +++--- .../com/twitter/heron/streamlet/Config.java | 25 ++++++++++++++++++- .../com/twitter/heron/streamlet/Runner.java | 20 +++++++++++---- 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD index dca6d5734bb..28ec978de24 100644 --- a/heron/api/src/java/BUILD +++ b/heron/api/src/java/BUILD @@ -15,7 +15,7 @@ java_doc( api_deps_files = \ heron_java_api_proto_files() + [ ":classification", - "//heron/common/src/java:basics-java", + "//heron/common/src/java:basics-java" ] # Low Level Api @@ -34,6 +34,7 @@ java_library( deps = api_deps_files + [ ":api-java-low-level", "//third_party/java:kryo-neverlink", + "//heron/simulator/src/java:simulator-java" ] ) @@ -42,13 +43,13 @@ java_library( name = "api-java-low-level-functional", javacopts = DOCLINT_HTML_AND_SYNTAX, srcs = glob(["com/twitter/heron/api/**/*.java", "com/twitter/heron/streamlet/**/*.java"]), - deps = api_deps_files + ["//third_party/java:kryo-neverlink"] + deps = api_deps_files + ["//third_party/java:kryo-neverlink", "//heron/simulator/src/java:simulator-java"] ) java_binary( name = "api-unshaded", srcs = glob(["com/twitter/heron/api/**/*.java", "com/twitter/heron/streamlet/**/*.java"]), - deps = api_deps_files + ["//third_party/java:kryo-neverlink"], + deps = api_deps_files + ["//third_party/java:kryo-neverlink", "//heron/simulator/src/java:simulator-java"], ) jarjar_binary( diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Config.java b/heron/api/src/java/com/twitter/heron/streamlet/Config.java index d856796f1b0..45f224e88b8 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/Config.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/Config.java @@ -31,9 +31,11 @@ public final class Config implements Serializable { private final DeliverySemantics deliverySemantics; private final Serializer serializer; private com.twitter.heron.api.Config heronConfig; + private final boolean shouldSimulate; private static final long MB = 1024 * 1024; private static final long GB = 1024 * MB; + /** * An enum encapsulating the delivery semantics that can be applied to Heron topologies. The * options are currently: at most once, at least once, or effectively once. @@ -60,6 +62,7 @@ private static class Defaults { static final long RAM = 100 * MB; static final DeliverySemantics SEMANTICS = DeliverySemantics.ATMOST_ONCE; static final Serializer SERIALIZER = Serializer.KRYO; + static final boolean shouldSimulate = false; } private Config(Builder builder) { @@ -68,6 +71,7 @@ private Config(Builder builder) { cpu = builder.cpu; ram = builder.ram; deliverySemantics = builder.deliverySemantics; + shouldSimulate = builder.shouldSimulate; } /** @@ -147,6 +151,14 @@ public Serializer getSerializer() { return serializer; } + /** + * Gets whether this should be simulated + * @return whether this should be simulated + */ + public boolean shouldSimulate() { + return shouldSimulate; + } + private static com.twitter.heron.api.Config.TopologyReliabilityMode translateSemantics( DeliverySemantics semantics) { switch (semantics) { @@ -167,13 +179,15 @@ public static final class Builder { private long ram; private DeliverySemantics deliverySemantics; private Serializer serializer; + private boolean shouldSimulate; private Builder() { config = Defaults.CONFIG; cpu = Defaults.CPU; ram = Defaults.RAM; deliverySemantics = Defaults.SEMANTICS; - serializer = Serializer.KRYO; + serializer = Defaults.SERIALIZER; + shouldSimulate = Defaults.shouldSimulate; } /** @@ -250,6 +264,15 @@ public Builder setUserConfig(String key, Object value) { return this; } + /** + * Sets whether this topology should be run in the simulator or not + * @param shouldSimulate whether this should be run in the simulator + */ + public Builder setShouldSimulate(boolean shouldSimulate) { + this.shouldSimulate = shouldSimulate; + return this; + } + private void useKryo() { try { config.setSerializationClassName(KryoSerializer.class.getName()); diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Runner.java b/heron/api/src/java/com/twitter/heron/streamlet/Runner.java index 05c572dfbab..8ca53dff8ff 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/Runner.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/Runner.java @@ -18,6 +18,7 @@ import com.twitter.heron.api.exception.AlreadyAliveException; import com.twitter.heron.api.exception.InvalidTopologyException; import com.twitter.heron.api.topology.TopologyBuilder; +import com.twitter.heron.simulator.Simulator; import com.twitter.heron.streamlet.impl.BuilderImpl; /** @@ -36,11 +37,20 @@ public Runner() { } public void run(String name, Config config, Builder builder) { BuilderImpl bldr = (BuilderImpl) builder; TopologyBuilder topologyBuilder = bldr.build(); - try { - HeronSubmitter.submitTopology(name, config.getHeronConfig(), - topologyBuilder.createTopology()); - } catch (AlreadyAliveException | InvalidTopologyException e) { - e.printStackTrace(); + + if (config.shouldSimulate()) { + new Simulator().submitTopology( + name, + config.getHeronConfig(), + topologyBuilder.createTopology() + ); + } else { + try { + HeronSubmitter.submitTopology(name, config.getHeronConfig(), + topologyBuilder.createTopology()); + } catch (AlreadyAliveException | InvalidTopologyException e) { + e.printStackTrace(); + } } } } From 78d73ce810074382524944075a4c720f455f977d Mon Sep 17 00:00:00 2001 From: dancollins34 Date: Sat, 3 Feb 2018 16:52:55 -0500 Subject: [PATCH 10/13] Fixed checkstyle issues --- .../src/java/com/twitter/heron/streamlet/Config.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Config.java b/heron/api/src/java/com/twitter/heron/streamlet/Config.java index 45f224e88b8..e1cd9c13f5b 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/Config.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/Config.java @@ -62,7 +62,7 @@ private static class Defaults { static final long RAM = 100 * MB; static final DeliverySemantics SEMANTICS = DeliverySemantics.ATMOST_ONCE; static final Serializer SERIALIZER = Serializer.KRYO; - static final boolean shouldSimulate = false; + static final boolean SHOULDSIMULATE = false; } private Config(Builder builder) { @@ -187,7 +187,7 @@ private Builder() { ram = Defaults.RAM; deliverySemantics = Defaults.SEMANTICS; serializer = Defaults.SERIALIZER; - shouldSimulate = Defaults.shouldSimulate; + shouldSimulate = Defaults.SHOULDSIMULATE; } /** @@ -266,10 +266,10 @@ public Builder setUserConfig(String key, Object value) { /** * Sets whether this topology should be run in the simulator or not - * @param shouldSimulate whether this should be run in the simulator + * @param value whether this should be run in the simulator */ - public Builder setShouldSimulate(boolean shouldSimulate) { - this.shouldSimulate = shouldSimulate; + public Builder setShouldSimulate(boolean value) { + this.shouldSimulate = value; return this; } From 0c33a4f5f4c432663fd3cfb3a74d7429c0b107c6 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 7 Feb 2018 20:50:52 -0500 Subject: [PATCH 11/13] Revert "Fixed checkstyle issues" This reverts commit 78d73ce810074382524944075a4c720f455f977d. --- .../src/java/com/twitter/heron/streamlet/Config.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Config.java b/heron/api/src/java/com/twitter/heron/streamlet/Config.java index e1cd9c13f5b..45f224e88b8 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/Config.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/Config.java @@ -62,7 +62,7 @@ private static class Defaults { static final long RAM = 100 * MB; static final DeliverySemantics SEMANTICS = DeliverySemantics.ATMOST_ONCE; static final Serializer SERIALIZER = Serializer.KRYO; - static final boolean SHOULDSIMULATE = false; + static final boolean shouldSimulate = false; } private Config(Builder builder) { @@ -187,7 +187,7 @@ private Builder() { ram = Defaults.RAM; deliverySemantics = Defaults.SEMANTICS; serializer = Defaults.SERIALIZER; - shouldSimulate = Defaults.SHOULDSIMULATE; + shouldSimulate = Defaults.shouldSimulate; } /** @@ -266,10 +266,10 @@ public Builder setUserConfig(String key, Object value) { /** * Sets whether this topology should be run in the simulator or not - * @param value whether this should be run in the simulator + * @param shouldSimulate whether this should be run in the simulator */ - public Builder setShouldSimulate(boolean value) { - this.shouldSimulate = value; + public Builder setShouldSimulate(boolean shouldSimulate) { + this.shouldSimulate = shouldSimulate; return this; } From 9bab39fc4a9a192606e149159d248b4bcc94fccf Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 7 Feb 2018 20:50:58 -0500 Subject: [PATCH 12/13] Revert "Add the ability to simulate streamlet topologies" This reverts commit 1d15f2db4944326f2e8430c4ae9fa9fdc9b2d145. --- heron/api/src/java/BUILD | 7 +++--- .../com/twitter/heron/streamlet/Config.java | 25 +------------------ .../com/twitter/heron/streamlet/Runner.java | 20 ++++----------- 3 files changed, 9 insertions(+), 43 deletions(-) diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD index 28ec978de24..dca6d5734bb 100644 --- a/heron/api/src/java/BUILD +++ b/heron/api/src/java/BUILD @@ -15,7 +15,7 @@ java_doc( api_deps_files = \ heron_java_api_proto_files() + [ ":classification", - "//heron/common/src/java:basics-java" + "//heron/common/src/java:basics-java", ] # Low Level Api @@ -34,7 +34,6 @@ java_library( deps = api_deps_files + [ ":api-java-low-level", "//third_party/java:kryo-neverlink", - "//heron/simulator/src/java:simulator-java" ] ) @@ -43,13 +42,13 @@ java_library( name = "api-java-low-level-functional", javacopts = DOCLINT_HTML_AND_SYNTAX, srcs = glob(["com/twitter/heron/api/**/*.java", "com/twitter/heron/streamlet/**/*.java"]), - deps = api_deps_files + ["//third_party/java:kryo-neverlink", "//heron/simulator/src/java:simulator-java"] + deps = api_deps_files + ["//third_party/java:kryo-neverlink"] ) java_binary( name = "api-unshaded", srcs = glob(["com/twitter/heron/api/**/*.java", "com/twitter/heron/streamlet/**/*.java"]), - deps = api_deps_files + ["//third_party/java:kryo-neverlink", "//heron/simulator/src/java:simulator-java"], + deps = api_deps_files + ["//third_party/java:kryo-neverlink"], ) jarjar_binary( diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Config.java b/heron/api/src/java/com/twitter/heron/streamlet/Config.java index 45f224e88b8..d856796f1b0 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/Config.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/Config.java @@ -31,11 +31,9 @@ public final class Config implements Serializable { private final DeliverySemantics deliverySemantics; private final Serializer serializer; private com.twitter.heron.api.Config heronConfig; - private final boolean shouldSimulate; private static final long MB = 1024 * 1024; private static final long GB = 1024 * MB; - /** * An enum encapsulating the delivery semantics that can be applied to Heron topologies. The * options are currently: at most once, at least once, or effectively once. @@ -62,7 +60,6 @@ private static class Defaults { static final long RAM = 100 * MB; static final DeliverySemantics SEMANTICS = DeliverySemantics.ATMOST_ONCE; static final Serializer SERIALIZER = Serializer.KRYO; - static final boolean shouldSimulate = false; } private Config(Builder builder) { @@ -71,7 +68,6 @@ private Config(Builder builder) { cpu = builder.cpu; ram = builder.ram; deliverySemantics = builder.deliverySemantics; - shouldSimulate = builder.shouldSimulate; } /** @@ -151,14 +147,6 @@ public Serializer getSerializer() { return serializer; } - /** - * Gets whether this should be simulated - * @return whether this should be simulated - */ - public boolean shouldSimulate() { - return shouldSimulate; - } - private static com.twitter.heron.api.Config.TopologyReliabilityMode translateSemantics( DeliverySemantics semantics) { switch (semantics) { @@ -179,15 +167,13 @@ public static final class Builder { private long ram; private DeliverySemantics deliverySemantics; private Serializer serializer; - private boolean shouldSimulate; private Builder() { config = Defaults.CONFIG; cpu = Defaults.CPU; ram = Defaults.RAM; deliverySemantics = Defaults.SEMANTICS; - serializer = Defaults.SERIALIZER; - shouldSimulate = Defaults.shouldSimulate; + serializer = Serializer.KRYO; } /** @@ -264,15 +250,6 @@ public Builder setUserConfig(String key, Object value) { return this; } - /** - * Sets whether this topology should be run in the simulator or not - * @param shouldSimulate whether this should be run in the simulator - */ - public Builder setShouldSimulate(boolean shouldSimulate) { - this.shouldSimulate = shouldSimulate; - return this; - } - private void useKryo() { try { config.setSerializationClassName(KryoSerializer.class.getName()); diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Runner.java b/heron/api/src/java/com/twitter/heron/streamlet/Runner.java index 8ca53dff8ff..05c572dfbab 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/Runner.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/Runner.java @@ -18,7 +18,6 @@ import com.twitter.heron.api.exception.AlreadyAliveException; import com.twitter.heron.api.exception.InvalidTopologyException; import com.twitter.heron.api.topology.TopologyBuilder; -import com.twitter.heron.simulator.Simulator; import com.twitter.heron.streamlet.impl.BuilderImpl; /** @@ -37,20 +36,11 @@ public Runner() { } public void run(String name, Config config, Builder builder) { BuilderImpl bldr = (BuilderImpl) builder; TopologyBuilder topologyBuilder = bldr.build(); - - if (config.shouldSimulate()) { - new Simulator().submitTopology( - name, - config.getHeronConfig(), - topologyBuilder.createTopology() - ); - } else { - try { - HeronSubmitter.submitTopology(name, config.getHeronConfig(), - topologyBuilder.createTopology()); - } catch (AlreadyAliveException | InvalidTopologyException e) { - e.printStackTrace(); - } + try { + HeronSubmitter.submitTopology(name, config.getHeronConfig(), + topologyBuilder.createTopology()); + } catch (AlreadyAliveException | InvalidTopologyException e) { + e.printStackTrace(); } } } From cfa9c4303fafebfbe91afd4529246897f191ea48 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 7 Feb 2018 22:53:32 -0500 Subject: [PATCH 13/13] Retry in an attempt to remove the dependency to simulator from api --- .../com/twitter/heron/api/HeronSubmitter.java | 9 ++++++-- .../twitter/heron/api/TopologySubmitter.java | 22 +++++++++++++++++++ .../com/twitter/heron/streamlet/Runner.java | 12 ++++++++-- .../twitter/heron/simulator/Simulator.java | 9 +++++++- 4 files changed, 47 insertions(+), 5 deletions(-) create mode 100644 heron/api/src/java/com/twitter/heron/api/TopologySubmitter.java diff --git a/heron/api/src/java/com/twitter/heron/api/HeronSubmitter.java b/heron/api/src/java/com/twitter/heron/api/HeronSubmitter.java index 1048f9e7a87..719df40479c 100644 --- a/heron/api/src/java/com/twitter/heron/api/HeronSubmitter.java +++ b/heron/api/src/java/com/twitter/heron/api/HeronSubmitter.java @@ -52,10 +52,10 @@ * with the "heron jar" command from the command-line, and then use this class to * submit your topologies. */ -public final class HeronSubmitter { +public final class HeronSubmitter implements TopologySubmitter { private static final Logger LOG = Logger.getLogger(HeronSubmitter.class.getName()); - private HeronSubmitter() { + public HeronSubmitter() { } /** @@ -70,6 +70,11 @@ private HeronSubmitter() { */ public static void submitTopology(String name, Config heronConfig, HeronTopology topology) throws AlreadyAliveException, InvalidTopologyException { + new HeronSubmitter().submitTopologyInherited(name, heronConfig, topology); + } + + public void submitTopologyInherited(String name, Config heronConfig, HeronTopology topology) + throws AlreadyAliveException, InvalidTopologyException { Map heronCmdOptions = Utils.readCommandLineOpts(); // We would read the topology initial state from arguments from heron-cli diff --git a/heron/api/src/java/com/twitter/heron/api/TopologySubmitter.java b/heron/api/src/java/com/twitter/heron/api/TopologySubmitter.java new file mode 100644 index 00000000000..88952785b72 --- /dev/null +++ b/heron/api/src/java/com/twitter/heron/api/TopologySubmitter.java @@ -0,0 +1,22 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.api; + +import com.twitter.heron.api.exception.AlreadyAliveException; +import com.twitter.heron.api.exception.InvalidTopologyException; + +public interface TopologySubmitter { + void submitTopologyInherited(String name, Config heronConfig, HeronTopology topology) + throws AlreadyAliveException, InvalidTopologyException; +} diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Runner.java b/heron/api/src/java/com/twitter/heron/streamlet/Runner.java index 05c572dfbab..091c507872a 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/Runner.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/Runner.java @@ -15,6 +15,7 @@ package com.twitter.heron.streamlet; import com.twitter.heron.api.HeronSubmitter; +import com.twitter.heron.api.TopologySubmitter; import com.twitter.heron.api.exception.AlreadyAliveException; import com.twitter.heron.api.exception.InvalidTopologyException; import com.twitter.heron.api.topology.TopologyBuilder; @@ -34,11 +35,18 @@ public Runner() { } * @param builder The builder used to keep track of the sources. */ public void run(String name, Config config, Builder builder) { + this.run(name, config, builder, new HeronSubmitter()); + } + + public void run(String name, Config config, Builder builder, TopologySubmitter submitter) { BuilderImpl bldr = (BuilderImpl) builder; TopologyBuilder topologyBuilder = bldr.build(); try { - HeronSubmitter.submitTopology(name, config.getHeronConfig(), - topologyBuilder.createTopology()); + submitter.submitTopologyInherited( + name, + config.getHeronConfig(), + topologyBuilder.createTopology() + ); } catch (AlreadyAliveException | InvalidTopologyException e) { e.printStackTrace(); } diff --git a/heron/simulator/src/java/com/twitter/heron/simulator/Simulator.java b/heron/simulator/src/java/com/twitter/heron/simulator/Simulator.java index 9053a21f03a..7dd43e54e2d 100644 --- a/heron/simulator/src/java/com/twitter/heron/simulator/Simulator.java +++ b/heron/simulator/src/java/com/twitter/heron/simulator/Simulator.java @@ -24,6 +24,7 @@ import com.twitter.heron.api.Config; import com.twitter.heron.api.HeronTopology; +import com.twitter.heron.api.TopologySubmitter; import com.twitter.heron.api.generated.TopologyAPI; import com.twitter.heron.api.utils.TopologyUtils; import com.twitter.heron.common.basics.ByteAmount; @@ -40,7 +41,7 @@ * One Simulator instance can only submit one topology. Please have multiple Simulator instances * for multiple topologies. */ -public class Simulator { +public class Simulator implements TopologySubmitter { private static final Logger LOG = Logger.getLogger(Simulator.class.getName()); private final List instanceExecutors = new LinkedList<>(); @@ -96,6 +97,12 @@ protected void registerSystemConfig(SystemConfig sysConfig) { SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, sysConfig); } + public void submitTopologyInherited(String name, + Config heronConfig, + HeronTopology heronTopology) { + this.submitTopology(name, heronConfig, heronTopology); + } + /** * Submit and run topology in simulator * @param name topology name