support caching of inputs from non-required links#162
Conversation
5fd4f59 to
0b50483
Compare
loichuder
left a comment
There was a problem hiding this comment.
Looks ok to me but the new test seems to be failing?
Yes because we first need an |
|
workflow21 test fails after introducing This one was fine: OUT only gets non-required links flowchart TD
IN[in]
GT[greater_than]
OUT1[out1 default a=1]
OUT2[out2 default a=2]
OUT[out]
IN -->|a| GT
GT -.->|true| OUT1
GT -.->|false| OUT2
OUT1 -->|return_value to a| OUT
OUT2 -->|return_value to b| OUT
This one failed: OUT gets one required link unless we set flowchart TD
IN[in]
GT[raise_not_greater_than]
OUT1[out1 default a=1]
OUT2[out2 default a=2]
OUT[out]
IN -->|a| GT
GT -->|success| OUT1
GT -.->|error| OUT2
OUT1 -->|return_value to a| OUT
OUT2 -->|return_value to b| OUT
|
|
In ewoks-kit/ewokscore#433 I describe in the spec's that
However it becomes clear now that there are race-condition with undefined results. Suppose a node has incoming links A (required), B (not required, not cached) and C (not required, not cached). Then you could have these executions:
However if we get the trigger from B first, then C and then A we get only one execution
If we enabled the caching of B
These are some examples. Anyway, it is clear that we need to make this more deterministic. If not the order of the calls then at least the calls themselves. |
|
In the current state of this PR the
Because (3) only keeps the last trigger, all triggers of (3) are lost until (1) is fully filled. So I suggest to make (3) a list. And then when (1) is fully filled to execute several times while purging list (3). Of course the non-required links with caching (2) will make it still unpredictable with what parameter combination we execute. |
So, once all required inputs are filled, we will execute for each non-optional previous trigger. Personally, that could seem unexpected but I guess it depends what you want to do. |
Indeed. Wouldn't loosing triggers as we do now make it more unexpected? With that change we will know that the task will be executed with {A}, [{A+B}, {A+C}] where the first call always happens and the last two optionally in any order. So we have at least one call and at most 3 calls depending on the conditions on link B and C. Right now depending on the order in which the triggers arrive we have three possibilities
because depending on the order of the triggers A – B – C: {A}, [{A+B}, {A+C}] A – C – B: {A}, [{A+C}, {A+B}] B – A – C: {A}, [{A+C}] B – C – A: [{A+C}] C – A – B: {A+C}, [{A+B}] C – B – A: [{A+B}] With the proposed change
A – B – C: {A}, [{A+B}, {A+C}] A – C – B: {A}, [{A+C}, {A+B}] B – A – C: {A[+B]}, [{A+C}] B – C – A: {A[+B]}, [{A+C}] C – A – B: {A[+C]}, [{A+B}] C – B – A: [{A[+B]}] or [{A[+C]}] Of course when we introduce caching for the non-required B and C we would introduce again several possible calls (in terms of the merging, the number is till predictable). Lets say A (required), B (not required, not cached) and C (not required, cached) and we implement
A – B – C: {A}, [{A+B}, {A+C}] A – C – B: {A}, [{A+C}, {A+B+C}] B – A – C: {A}, [{A+C}] B – C – A: {A}, [{A+C}] C – A – B: {A+C}, [{A+B+C}] C – B – A: [{A+B+C}], {A+C} @LudoBroche @olofsvensson I'll need your input here. Edit: I keep editing the examples. Sorry, I don't know how to express it properly. Shows the complexity of this thing. |
Yes, there is no simple answer so both can be unexpected IMO. I don't have a problem to change the current behaviour, we just have to document is thoroughly. |
ffde4f2 to
85abd3e
Compare
48b9207 to
21875e3
Compare
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
|
This MR revealed an issue in the "scheduler thread count" in This PR depends on the following pre-releases:
Since the last review I added |
27633ad to
8bbdfff
Compare
|
A weakness in For example |
|
Ok I see a logical error in the workflow # Decider executed with inputs: {'metric_threshold': 35, 'param1': -1, 'param2': -2, 'metric1': 10, 'metric2': 20, 'metric3': 30, 'metric4': 40}
# Metrics that are good: ['metric4']
# Metrics that are bad: ['metric1', 'metric2', 'metric3']
# Metrics that are unknown: []
# Timeout: no
# TRIGGER NOTHINGIf 'metric4' is the newly arrive metric we should trigger GOOD. If 'metric3' is the newly arrive metric we should trigger nothing. There is no way for the On top of that, in this example 'metric4' arrives first and nothing was triggered because of the faulty def test_ppf_workflow25_timeout1(ppf_log_config, call_record_file: Path):
"""test 'cache_if_optional' links"""
# Metrics that pass: metric4
inputs = create_inputs(call_record_file, 35)
result = execute_graph(workflow(), inputs=inputs)
expected = {"state": "BAD", "reason": "timeout"}
> _assert_result(call_record_file, result, expected)
src\ewoksppf\tests\test_ppf_workflow25.py:278:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
call_record_file = WindowsPath('C:/Users/runneradmin/AppData/Local/Temp/pytest-of-runneradmin/pytest-0/test_ppf_workflow25_timeout10/calls.json')
result = {}, expected = {'reason': 'timeout', 'state': 'BAD'}
def _assert_result(
call_record_file: Path, result: Dict[str, str], expected: Dict[str, str]
) -> None:
calls = json.loads(call_record_file.read_text())
> assert calls == [expected]
E AssertionError: assert [] == [{'reason': '...tate': 'BAD'}]
E
E Right contains one more item: {'reason': 'timeout', 'state': 'BAD'}
E
E Full diff:
E + []
E - [
E - {
E - 'reason': 'timeout',
E - 'state': 'BAD',
E - },
E - ]
src\ewoksppf\tests\test_ppf_workflow25.py:305: AssertionError
---------------------------- Captured stdout call -----------------------------
Decider executed with inputs: {'metric_threshold': 35, 'param1': -1, 'param2': -2}
Metrics that are good: []
Metrics that are bad: []
Metrics that are unknown: ['metric1', 'metric2', 'metric3', 'metric4']
Timeout: no
TRIGGER NOTHING
Decider executed with inputs: {'metric_threshold': 35, 'param1': -1, 'param2': -2, 'metric1': 10}
Metrics that are good: []
Metrics that are bad: ['metric1']
Metrics that are unknown: ['metric2', 'metric3', 'metric4']
Timeout: no
TRIGGER NOTHING
Decider executed with inputs: {'metric_threshold': 35, 'param1': -1, 'param2': -2, 'metric1': 10, 'metric2': 20}
Metrics that are good: []
Metrics that are bad: ['metric1', 'metric2']
Metrics that are unknown: ['metric3', 'metric4']
Timeout: no
TRIGGER NOTHING
Decider executed with inputs: {'metric_threshold': 35, 'param1': -1, 'param2': -2, 'metric1': 10, 'metric2': 20, 'metric3': 30}
Metrics that are good: []
Metrics that are bad: ['metric1', 'metric2', 'metric3']
Metrics that are unknown: ['metric4']
Timeout: no
TRIGGER NOTHING
Decider executed with inputs: {'metric_threshold': 35, 'param1': -1, 'param2': -2, 'metric1': 10, 'metric2': 20, 'metric3': 30, 'metric4': 40}
Metrics that are good: ['metric4']
Metrics that are bad: ['metric1', 'metric2', 'metric3']
Metrics that are unknown: []
Timeout: no
TRIGGER NOTHING
Decider executed with inputs: {'metric_threshold': 35, 'param1': -1, 'param2': -2, 'metric1': 10, 'metric2': 20, 'metric3': 30, 'metric4': 40, 'timeout': True}
Metrics that are good: ['metric4']
Metrics that are bad: ['metric1', 'metric2', 'metric3']
Metrics that are unknown: []
Timeout: yes
TRIGGER NOTHING
=========================== short test summary info ===========================
SKIPPED [2] src\ewoksppf\tests\test_examples.py:12: Self-triggering workflow execution is inconsistent: https://github.com/ewoks-kit/ewoksppf/issues/16
SKIPPED [2] src\ewoksppf\tests\test_ppf_workflow23.py:83: Conditional branches that merge again are not handled yet
FAILED src/ewoksppf/tests/test_ppf_workflow25.py::test_ppf_workflow25_timeout1 - AssertionError: assert [] == [{'reason': '...tate': 'BAD'}]
Right contains one more item: {'reason': 'timeout', 'state': 'BAD'}
Full diff:
+ []
- [
- {
- 'reason': 'timeout',
- 'state': 'BAD',
- },
- ] |
|
Changes:
This is the workflow and it is executed with delays of [0, 0.5, 1] for all permutations of ["required", "optional", "retained"] to test all possibilities of trigger order of the different link types. graph LR
%% Nodes
required1[required1]
required2[required2]
optional1[optional1]
optional2[optional2]
retained1[retained1]
retained2[retained2]
gather[gather]
%% Required (always cached in Ewoks)
required1 -->|cached| gather
required2 -->|cached| gather
%% Optional + cached
optional1 -.->|cached| gather
optional2 -.->|cached| gather
%% Optional (not cached)
retained1 -.->|retained | gather
retained2 -.->|retained | gather
|
|
Still found a bug in the test when
The src/ewoksppf/tests/test_ppf_workflow25.py::test_ppf_workflow25[required-optional-retained] PASSED [ 16%]
src/ewoksppf/tests/test_ppf_workflow25.py::test_ppf_workflow25[required-retained-optional] PASSED [ 33%]
src/ewoksppf/tests/test_ppf_workflow25.py::test_ppf_workflow25[optional-required-retained] PASSED [ 50%]
src/ewoksppf/tests/test_ppf_workflow25.py::test_ppf_workflow25[optional-retained-required] PASSED [ 66%]
src/ewoksppf/tests/test_ppf_workflow25.py::test_ppf_workflow25[retained-required-optional] FAILED [ 83%]
=================================================================================================== FAILURES ===================================================================================================
_______________________________________________________________________________ test_ppf_workflow25[retained-required-optional] ________________________________________________________________________________
ppf_log_config = None, order = ('retained', 'required', 'optional')
@pytest.mark.parametrize("order", _ORDER, ids=["-".join(keys) for keys in _ORDER])
def test_ppf_workflow25(ppf_log_config, order):
"""Test input caching for different types of links executed in different orders."""
compute_times = [0, 0.5, 1]
inputs = get_inputs(**dict(zip(order, compute_times)))
result = execute_graph(workflow(), pool_type="thread", inputs=inputs)
cached = set(result["cached"])
cached1 = {"required1", "required2", "optional1", "optional2", "retained1"}
cached2 = {"required1", "required2", "optional1", "optional2", "retained2"}
> assert cached == cached1 or cached == cached2, cached
E AssertionError: {'optional1', 'required1', 'required2', 'retained2'}
E assert ({'required2', 'optional1', 'retained2', 'required1'} == {'optional1', 'required2', 'retained1', 'optional2', 'required1'}
E
E Extra items in the left set:
E 'retained2'
E Extra items in the right set:
E 'retained1'
E 'optional2'
E
E Full diff:
E {
E 'optional1',
E - 'optional2',
E 'required1',
E 'required2',
E - 'retained1',
E ? ^
E + 'retained2',
E ? ^
E } or {'required2', 'optional1', 'retained2', 'required1'} == {'optional1', 'required2', 'optional2', 'retained2', 'required1'}
E
E Extra items in the right set:
E 'optional2'
E
E Full diff:
E {
E 'optional1',
E - 'optional2',
E 'required1',
E 'required2',
E 'retained2',
E })
src/ewoksppf/tests/test_ppf_workflow25.py:152: AssertionError
--------------------------------------------------------------------------------------------- Captured stdout call ---------------------------------------------------------------------------------------------
Decider executed with inputs: {'required1': True, 'required2': True, 'retained1': True}
Decider executed with inputs:
{'required1': True, 'required2': True, 'retained2': True}
Decider executed with inputs:
Decider executed with inputs: {'required1': True, 'required2': True, 'optional1': True, 'optional2': True, 'retained2': True}
{'required1': True, 'required2': True, 'optional1': True, 'retained2': True}
=========================================================================================== short test summary info ============================================================================================
FAILED src/ewoksppf/tests/test_ppf_workflow25.py::test_ppf_workflow25[retained-required-optional] - AssertionError: {'optional1', 'required1', 'required2', 'retained2'}
assert ({'required2', 'optional1', 'retained2', 'required1'} == {'optional1', 'required2', 'retained1', 'optional2', 'required1'}
Extra items in the left set:
'retained2'
Extra items in the right set:
'retained1'
'optional2'
Full diff:
{
'optional1',
- 'optional2',
'required1',
'required2',
- 'retained1',
? ^
+ 'retained2',
? ^
} or {'required2', 'optional1', 'retained2', 'required1'} == {'optional1', 'required2', 'optional2', 'retained2', 'required1'}
Extra items in the right set:
'optional2'
Full diff:
{
'optional1',
- 'optional2',
'required1',
'required2',
'retained2',
}) |
…ion and rename cache_if_not_required to cache_if_optional
ba15423 to
fae606c
Compare
| def workflow21(on_error): | ||
| if on_error: | ||
| submodel21 = submodel21_on_error | ||
| out1_required = False |
There was a problem hiding this comment.
FYI this test fails in the main branch when on_error=True with ARG_FAILURE since ewokscore 5.0.0rc1
flowchart TD
n1["in"]
n2["raise_not_greater_than"]
n3["out1"]
n4["out2"]
n5["out"]
n1 --> n2
n2 --> n3
n2 -->|on error| n4
n3 -->|"required=False" fixes the test| n5
n4 --> n5
There was a problem hiding this comment.
Graph analysis before ewokscore 5.0.0rc1
in → gt required=True
gt → out1 required=True
gt → out2 required=False
out1 → out required=False
out2 → out required=False
Graph analysis since ewokscore 5.0.0rc1
in → gt required=True
gt → out1 required=True
gt → out2 required=False
out1 → out required=True
out2 → out required=False
With ARG_FAILURE the out1 node does not get executed. So if the graph analysis says it is required then node out never gets executed and the output of the workflow is empty which causes the test to fail.
There was a problem hiding this comment.
Before ewokscore 5.0.0rc1
def link_is_required(
graph: networkx.DiGraph, source_id: NodeIdType, target_id: NodeIdType
) -> bool:
if link_has_required(graph, source_id, target_id):
return True
if link_is_conditional(graph, source_id, target_id):
return False
not_required = node_has_ancestors(
graph, source_id, link_has_required=False, link_is_conditional=True
)
not_required |= node_has_ancestors(graph, source_id, node_has_error_handlers=True)
return not not_requiredSince ewokscore 5.0.0rc1
def link_is_required(
graph: networkx.DiGraph, source_id: NodeIdType, target_id: NodeIdType
) -> bool:
# Explicitly required or optional
if link_is_explicitly_required(graph, source_id, target_id):
return True
if link_is_explicitly_optional(graph, source_id, target_id):
return False
# By default, conditional links are optional
if link_is_conditional(graph, source_id, target_id):
return False
# By default, links with at least one non-required link upstream become non-required
return not node_has_ancestors(graph, source_id, link_is_required=False)There was a problem hiding this comment.
This condition makes the difference (which is now removed since 5.0.0rc1)
optional = node_has_ancestors(graph, source_id, node_has_error_handlers=True)It makes out1 → out optional because an ancestor node of out1 has an error handler ( that would be the gt node).
There was a problem hiding this comment.
There is no right or wrong answer here. We just need to be sure the ewoks SPEC is clear.
New SPEC of the link attribute `required
https://ewokscore.readthedocs.io/en/latest/reference/specs.html#link-attributes
required (optional): The link is required when set to True. The link is optional if False. A target node can only be executed after all its required predecessors have executed successfully. If a target has multiple required incoming links, it will be scheduled once all corresponding source tasks have completed (and may be scheduled multiple times as additional inputs from optional links arrive). See Node execution section for more details.
If the attribute is not explicitly specified (default behavior), the link is considered required when it is unconditional (i.e. has no conditions nor on_error=True) and all ancestors of the source node are connected through required links. Otherwise, the link is treated as optional.
Old SPEC of the link attribute `required
https://ewokscore.readthedocs.io/en/v4.0.2/reference/specs.html#link-attributes
required (optional): setting this to True marks the link as required. When a target receives multiple links, it will be executed (perhaps multiple times) when all the sources connected to the target with required links have been executed. A link is required when it is either “marked as required” (link attribute required=True) or “unconditional and all ancestors of the source node are required”.
Conclusion
Note that the old description does not actually describe the way the graph analysis considers ancestors to be required when required != True. Note also that required=False was ignored, so the same as required=None or not provided.
So ewokscore 5.0.0rc1 has modified the graph analysis result for link_is_required that was not documented before.
There was a problem hiding this comment.
So before ewokscore 5.0.0rc1 the graph analysis provides the desired required value of out1-out2 in this examples which after ewokscore 5.0.0rc1 it does not
flowchart TD
n1["in"]
n2["raise_not_greater_than"]
n3["out1"]
n4["out2"]
n5["out"]
n1 --> n2
n2 --> n3
n2 -->|on error| n4
n3 -->|"required=False" from old analysis which is desired| n5
n4 --> n5
Before ewokscore 5.0.0rc1 the graph analysis does NOT provide the desired required value of out1-out3 in this example while after ewokscore 5.0.0rc1 it does
flowchart TD
n1["in1"]
n2["raise_not_greater_than"]
n3["out1"]
n4["out2"]
n5["out3"]
n6["out4"]
n7["needed input"]
n1 --> n2
n2 --> n3
n2 -->|on error| n4
n3 -->|"required=False" from old analysis which is NOT desired| n5
n4 --> n6
n1 --> n7
n7 --> n5
When raise_not_greater_than raises we do not want out3 to be executed.
There was a problem hiding this comment.
Of course the long term goal is to make the ewokscore graph analysis more intelligent (discussion: https://confluence.esrf.fr/display/AAWWK/Graph+analysis) so that it looks at forking and merging of branches, not just looking at ancestor links being required or not.
There was a problem hiding this comment.
Workflows that will be affected but the graph analysis change:
- Daiquiri workflows: they want the new behavior (error handlers for mimosa never merge back into the main branch)
- BES workflows: sub-workflows tend to merge to a single output node so this change might require some explicit
required=False.
loichuder
left a comment
There was a problem hiding this comment.
I didn't understand all that happens in InputMergeActor but it looks consistent.
| global _GATHER_CACHE | ||
| _GATHER_CACHE = None | ||
| compute_times = [0, 0.5, 1] | ||
| inputs = get_inputs(**dict(zip(order, compute_times))) |
There was a problem hiding this comment.
That was a bit intense to read.
Perhaps the parameter should not be the order itself but rather the dict of compute times:
[{"required": 0, "optional": 0.5, "retained": 1}, ...
Could be generated by:
compute_times = [{"required": t[0], "optional": t[1], "retained": t[2]} for t in permutations([0, 0.5 ,1])]
There was a problem hiding this comment.
When you write it out fully it does not help imo.
This
_INPUT_ARGUMENTS = [
{"required": times[0], "optional": times[1], "retained": times[2]}
for times in itertools.permutations([0, 0.5, 1])
]
_INPUT_ARGUMENTS_IDS = [
"-".join(k for k, _ in sorted(i.items(), key=lambda item: item[1]))
for i in _INPUT_ARGUMENTS
]
@pytest.mark.parametrize("input_args", _INPUT_ARGUMENTS, ids=_INPUT_ARGUMENTS_IDS)
def test_ppf_workflow25(ppf_log_config, input_args):
inputs = get_inputs(**input_args)is less readable than this for me
_ORDER = list(itertools.permutations(["required", "optional", "retained"]))
@pytest.mark.parametrize("order", _ORDER, ids=["-".join(keys) for keys in _ORDER])
def test_ppf_workflow25(ppf_log_config, order):
compute_times = [0, 0.5, 1]
inputs = get_inputs(**dict(zip(order, compute_times)))There was a problem hiding this comment.
Feel free to make a follow-up PR if you see something better.
| len(self._cached_start_triggers), | ||
| len(self._cached_required_triggers), | ||
| len(self._cached_optional_triggers), | ||
| int(retained_inputs is not None), |
There was a problem hiding this comment.
So the number of retained links is always 1 or 0 ?
What is a "retained link" by the way? 😅
There was a problem hiding this comment.
There is no such thing as a "retained link". Inputs are cached, buffered or retained:
https://ewokscore.readthedocs.io/en/latest/reference/specs.html#summary
Buffered and retained is essentially the same from a link POV: inputs from optional non-cached links.
We buffer until the first execution (which happens when all required links are cached), after which the buffer is purged one-by-one (execute once for each item we purge) and the last one is retained.
There was a problem hiding this comment.
In the implementation I use two attributes (I added the type annotation in the code)
_buffer_optional_triggers:List[dict]
_retained_optional_trigger:Optional[dict]In the beginning I had only _buffer_optional_triggers on which we append and pop the inputs from optional non-cached links.
This matches the description better but the implementation is horrible due to the appending and popping which also depends on when it happens.
Now we
- use
_buffer_optional_triggersuntil the first execution: keep many to use later (aka buffering) - purge
_buffer_optional_triggerson first execution and put the last one in_retained_optional_trigger - after that only use
_retained_optional_trigger: keep only 1 (aka it retains the last one)
There was a problem hiding this comment.
So the number of retained links is always 1 or 0 ?
The number of retained inputs is always 1 or 0.
Co-authored-by: Loïc Huder <42204205+loichuder@users.noreply.github.com>
06aef3c to
d3749a9
Compare
Yes I tried many implementation and this one was most readable. But still I agree, not very easy to read. I think that's inevitable with the concepts of caching, buffering (before first execution) and retaining (after first execution). |
Closes #161 #167
Use case:
The task in the middle receives metric and has a threshold. We cache all inputs and this is what happens every time it gets triggered: