From cab695aea495a4307c2072a59514b4497d85e26e Mon Sep 17 00:00:00 2001 From: Vasiliy Ivanov Date: Tue, 22 Aug 2023 23:38:58 +1000 Subject: [PATCH 1/2] unify handling of outer parameters for joins 1. In opposite to Postgres, not all plan operators may be rescanned in gpdb. E.g., Motions must be scan only once. If we need to scan Motion output several times we should decorate it with Material operator. 2. The Nested loop operator may provide some outer parameters for the inner tree that depends on the current outer sub-tree tuple. Every time when the new outer tuple is fetched, inner sub-tree of the Nested Loop should be rescanned. Moreover, gpdb can't propagate outer parameters cross the slices, so the path to parameter's consumer may be considered as rescannable (there are no motions). Currently, only the Nested Loop operator handles presence of outer parameters (e.g. from ancestor Nested Loop) and make sure that both subtrees are rescannable if rescan expected (see the 25763c22b7). Hash and Merge Joins are passive and provides rescannable status based on their children only. But it's not enough to say about absence of possibility to rescan itself in case of there is outer parameter from ancestor, because rescan come here anyway even through Materialize appended above. To fix possible issues with non-rescannable operators rescan (see a provided regression test) we've decided to unify joins behavior. Hash Join has rescan implementation that differs from Postgres one. The hash table was modified to spill out all batches (including 0th) to disk (see the SpillCurrentBatch comment as a start point). So the hash join inner subtree always rescannable and hash join rescannability depends only on outer subtree. According to ExecReScanMergeJoin, Merge Join always rescan both subtrees in case of receiving rescan request. So we should make sure that both subtrees are rescannable. One plan was modified in gporca.sql regression tests file, and it becomes more consistent with ORCA plan. --- src/backend/executor/nodeHashjoin.c | 4 + src/backend/optimizer/util/pathnode.c | 120 ++++++++++++------ src/test/regress/expected/gporca.out | 18 +-- src/test/regress/expected/join_gp.out | 4 + .../regress/expected/join_gp_optimizer.out | 4 + src/test/regress/sql/join_gp.sql | 4 + 6 files changed, 107 insertions(+), 47 deletions(-) diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index d65daee42ba0..3f9f0d6f0bd8 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -1190,6 +1190,10 @@ ExecReScanHashJoin(HashJoinState *node) * if it's a single-batch join, and there is no parameter change for the * inner subnode, then we can just re-use the existing hash table without * rebuilding it. + * + * GPDB: hybrid hash join was modified to spill out scanned batches + * (including 0th batch) to disk to provide rescannability if it was + * requested. See the SpillCurrentBatch comment for details. */ if (node->hj_HashTable != NULL) { diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 79233aab1560..765c07e0b93e 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -59,6 +59,8 @@ static CdbVisitOpt pathnode_walk_list(List *pathlist, static CdbVisitOpt pathnode_walk_kids(Path *path, CdbVisitOpt (*walker)(Path *path, void *context), void *context); +static Path* ensure_outer_rescannable(PlannerInfo *root, Path *outer_path); +static Path* ensure_inner_rescannable(PlannerInfo *root, Path *outer_path, Path *inner_path); /* * pathnode_walk_node @@ -224,6 +226,53 @@ pathnode_walk_kids(Path *path, } /* pathnode_walk_kids */ +static Path* +ensure_outer_rescannable(PlannerInfo *root, Path *outer_path) +{ + if (!outer_path->rescannable) + { + MaterialPath *matouter = create_material_path(root, outer_path->parent, outer_path); + + matouter->cdb_shield_child_from_rescans = true; + + outer_path = (Path *) matouter; + } + + return outer_path; +} + +static Path* +ensure_inner_rescannable(PlannerInfo *root, Path *outer_path, Path *inner_path) +{ + if (!inner_path->rescannable) + { + /* + * NLs potentially rescan the inner; if our inner path + * isn't rescannable we have to add a materialize node + */ + MaterialPath *matinner = create_material_path(root, inner_path->parent, inner_path); + + matinner->cdb_shield_child_from_rescans = true; + + /* + * If we have motion on the outer, to avoid a deadlock; we + * need to set cdb_strict. In order for materialize to + * fully fetch the underlying (required to avoid our + * deadlock hazard) we must set cdb_strict! + */ + if (inner_path->motionHazard && outer_path->motionHazard) + { + matinner->cdb_strict = true; + matinner->path.motionHazard = false; + } + + inner_path = (Path *) matinner; + } + + return inner_path; +} + + /***************************************************************************** * MISC. PATH UTILITIES *****************************************************************************/ @@ -3218,47 +3267,18 @@ create_nestloop_path(PlannerInfo *root, /* * If this join path is parameterized by a parameter above this path, then - * this path needs to be rescannable. A NestLoop is rescannable, when both + * this path needs to be rescannable. Joins are rescannable, when both * outer and inner paths rescannable, so make them both rescannable. */ - if (!outer_path->rescannable && !bms_is_empty(required_outer)) - { - MaterialPath *matouter = create_material_path(root, outer_path->parent, outer_path); - - matouter->cdb_shield_child_from_rescans = true; - - outer_path = (Path *) matouter; - } + if (!bms_is_empty(required_outer)) + outer_path = ensure_outer_rescannable(root, outer_path); /* * If outer has at most one row, NJ will make at most one pass over inner. * Else materialize inner rel after motion so NJ can loop over results. */ - if (!inner_path->rescannable && - (!outer_path->parent->onerow || !bms_is_empty(required_outer))) - { - /* - * NLs potentially rescan the inner; if our inner path - * isn't rescannable we have to add a materialize node - */ - MaterialPath *matinner = create_material_path(root, inner_path->parent, inner_path); - - matinner->cdb_shield_child_from_rescans = true; - - /* - * If we have motion on the outer, to avoid a deadlock; we - * need to set cdb_strict. In order for materialize to - * fully fetch the underlying (required to avoid our - * deadlock hazard) we must set cdb_strict! - */ - if (inner_path->motionHazard && outer_path->motionHazard) - { - matinner->cdb_strict = true; - matinner->path.motionHazard = false; - } - - inner_path = (Path *) matinner; - } + if (!outer_path->parent->onerow || !bms_is_empty(required_outer)) + inner_path = ensure_inner_rescannable(root, outer_path, inner_path); /* * If the inner path is parameterized by the outer, we must drop any @@ -3456,6 +3476,17 @@ create_mergejoin_path(PlannerInfo *root, inner_path->pathkeys) innersortkeys = NIL; + /* + * If this join path is parameterized by a parameter above this path, then + * this path needs to be rescannable. Joins are rescannable, when both + * outer and inner paths rescannable, so make them both rescannable. + */ + if (!bms_is_empty(required_outer)) + { + outer_path = ensure_outer_rescannable(root, outer_path); + inner_path = ensure_inner_rescannable(root, outer_path, inner_path); + } + pathnode->jpath.path.pathtype = T_MergeJoin; pathnode->jpath.path.parent = joinrel; pathnode->jpath.path.param_info = @@ -3573,6 +3604,17 @@ create_hashjoin_path(PlannerInfo *root, return NULL; } + /* + * If this join path is parameterized by a parameter above this path, then + * this path needs to be rescannable. Joins are rescannable, when both + * outer and inner paths rescannable, so make them both rescannable. + * + * Hash Join inner side was customized in the GPDB to be always rescannable + * (see the SpillCurrentBatch comment for details) + */ + if (!bms_is_empty(required_outer)) + outer_path = ensure_outer_rescannable(root, outer_path); + pathnode = makeNode(HashPath); pathnode->jpath.path.pathtype = T_HashJoin; @@ -3608,12 +3650,12 @@ create_hashjoin_path(PlannerInfo *root, /* final_cost_hashjoin will fill in pathnode->num_batches */ /* - * If hash table overflows to disk, and an ancestor node requests rescan - * (e.g. because the HJ is in the inner subtree of a NJ), then the HJ has - * to be redone, including rescanning the inner rel in order to rebuild - * the hash table. + * GPDB hash tables was modified to be rescannable even in case of hash + * table overflows to disk, and an ancestor node requests rescan + * (e.g. because the HJ is in the inner subtree of a NJ). + * See the SpillCurrentBatch comment for details */ - pathnode->jpath.path.rescannable = outer_path->rescannable && inner_path->rescannable; + pathnode->jpath.path.rescannable = outer_path->rescannable; /* see the comment above; we may have a motion hazard on our inner ?! */ if (pathnode->jpath.path.rescannable) diff --git a/src/test/regress/expected/gporca.out b/src/test/regress/expected/gporca.out index 8348b38c1107..94c46257bb5f 100644 --- a/src/test/regress/expected/gporca.out +++ b/src/test/regress/expected/gporca.out @@ -12588,15 +12588,16 @@ where out.b in (select coalesce(tcorr2_d.c, 99) group by a) tcorr2_d on tcorr1.a=tcorr2_d.a); QUERY PLAN ---------------------------------------------------------------------------------------------------------------------- - Nested Loop Semi Join (cost=10000000001.07..10000000003.21 rows=4 width=8) + Nested Loop Semi Join (cost=10000000001.07..10000000003.19 rows=4 width=8) Join Filter: ("out".b = COALESCE((count(*)), '99'::bigint)) -> Gather Motion 3:1 (slice1; segments: 3) (cost=0.00..1.03 rows=1 width=8) -> Seq Scan on tcorr1 "out" (cost=0.00..1.01 rows=1 width=8) -> Materialize (cost=1.07..2.16 rows=2 width=8) -> Hash Left Join (cost=1.07..2.14 rows=4 width=8) Hash Cond: (tcorr1.a = tcorr2.a) - -> Gather Motion 3:1 (slice2; segments: 3) (cost=0.00..1.03 rows=1 width=4) - -> Seq Scan on tcorr1 (cost=0.00..1.01 rows=1 width=4) + -> Materialize (cost=0.00..1.03 rows=1 width=4) + -> Gather Motion 3:1 (slice2; segments: 3) (cost=0.00..1.03 rows=1 width=4) + -> Seq Scan on tcorr1 (cost=0.00..1.01 rows=1 width=4) -> Hash (cost=1.06..1.06 rows=1 width=12) -> HashAggregate (cost=1.04..1.05 rows=1 width=12) Group Key: tcorr2.a @@ -12606,7 +12607,7 @@ where out.b in (select coalesce(tcorr2_d.c, 99) -> Gather Motion 3:1 (slice3; segments: 3) (cost=0.00..1.03 rows=1 width=8) -> Seq Scan on tcorr2 (cost=0.00..1.01 rows=1 width=8) Optimizer: Postgres query optimizer -(18 rows) +(19 rows) -- expect 1 row select * @@ -12704,15 +12705,16 @@ where out.b in (select coalesce(tcorr2_d.c, 99) group by a) tcorr2_d on tcorr1.a=tcorr2_d.a); QUERY PLAN ---------------------------------------------------------------------------------------------------------------------- - Nested Loop Semi Join (cost=10000000001.07..10000000003.21 rows=4 width=8) + Nested Loop Semi Join (cost=10000000001.07..10000000003.19 rows=4 width=8) Join Filter: ("out".b = COALESCE((count(*)), '99'::bigint)) -> Gather Motion 3:1 (slice1; segments: 3) (cost=0.00..1.03 rows=1 width=8) -> Seq Scan on tcorr1 "out" (cost=0.00..1.01 rows=1 width=8) -> Materialize (cost=1.07..2.16 rows=2 width=8) -> Hash Left Join (cost=1.07..2.14 rows=4 width=8) Hash Cond: (tcorr1.a = tcorr2.a) - -> Gather Motion 3:1 (slice2; segments: 3) (cost=0.00..1.03 rows=1 width=4) - -> Seq Scan on tcorr1 (cost=0.00..1.01 rows=1 width=4) + -> Materialize (cost=0.00..1.03 rows=1 width=4) + -> Gather Motion 3:1 (slice2; segments: 3) (cost=0.00..1.03 rows=1 width=4) + -> Seq Scan on tcorr1 (cost=0.00..1.01 rows=1 width=4) -> Hash (cost=1.06..1.06 rows=1 width=12) -> HashAggregate (cost=1.04..1.05 rows=1 width=12) Group Key: tcorr2.a @@ -12722,7 +12724,7 @@ where out.b in (select coalesce(tcorr2_d.c, 99) -> Gather Motion 3:1 (slice3; segments: 3) (cost=0.00..1.03 rows=1 width=8) -> Seq Scan on tcorr2 (cost=0.00..1.01 rows=1 width=8) Optimizer: Postgres query optimizer -(18 rows) +(19 rows) -- expect 1 row select * diff --git a/src/test/regress/expected/join_gp.out b/src/test/regress/expected/join_gp.out index 48871a385b0d..978173add6fb 100644 --- a/src/test/regress/expected/join_gp.out +++ b/src/test/regress/expected/join_gp.out @@ -5,6 +5,10 @@ -- start_matchignore -- m/ERROR: workfile compresssion is not supported by this build/ -- end_matchignore +-- start_matchsubs +-- m/ERROR: could not devise a query plan for the given query \(pathnode.c:\d+\)/ +-- s/ERROR: could not devise a query plan for the given query \(pathnode.c:\d+\)/ERROR: could not devise a query plan for the given query (pathnode.c:XX)/ +-- end_matchsubs -- -- test numeric hash join -- diff --git a/src/test/regress/expected/join_gp_optimizer.out b/src/test/regress/expected/join_gp_optimizer.out index e3f205c9dea2..f4487a31275a 100644 --- a/src/test/regress/expected/join_gp_optimizer.out +++ b/src/test/regress/expected/join_gp_optimizer.out @@ -5,6 +5,10 @@ -- start_matchignore -- m/ERROR: workfile compresssion is not supported by this build/ -- end_matchignore +-- start_matchsubs +-- m/ERROR: could not devise a query plan for the given query \(pathnode.c:\d+\)/ +-- s/ERROR: could not devise a query plan for the given query \(pathnode.c:\d+\)/ERROR: could not devise a query plan for the given query (pathnode.c:XX)/ +-- end_matchsubs -- -- test numeric hash join -- diff --git a/src/test/regress/sql/join_gp.sql b/src/test/regress/sql/join_gp.sql index bcb7cef1fd48..e625b17c802a 100644 --- a/src/test/regress/sql/join_gp.sql +++ b/src/test/regress/sql/join_gp.sql @@ -6,6 +6,10 @@ -- start_matchignore -- m/ERROR: workfile compresssion is not supported by this build/ -- end_matchignore +-- start_matchsubs +-- m/ERROR: could not devise a query plan for the given query \(pathnode.c:\d+\)/ +-- s/ERROR: could not devise a query plan for the given query \(pathnode.c:\d+\)/ERROR: could not devise a query plan for the given query (pathnode.c:XX)/ +-- end_matchsubs -- -- test numeric hash join From 9d7f4bc088164ff422a36221ea2e22784b65a71d Mon Sep 17 00:00:00 2001 From: Vasiliy Ivanov Date: Fri, 25 Aug 2023 20:07:50 +1000 Subject: [PATCH 2/2] fixme! revert rescannable calc for hash join node currently there is another problem connected with eager free resources for hash join operator. gpdb relies on REWIND executor flag to detect rescan sub-trees but NL doesn't provide it in case of presence of outer params. But appending Material under NL masks this problem. --- src/backend/optimizer/util/pathnode.c | 10 +++++----- src/test/regress/expected/gporca.out | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 765c07e0b93e..cc4f0a2b2ccb 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -3650,12 +3650,12 @@ create_hashjoin_path(PlannerInfo *root, /* final_cost_hashjoin will fill in pathnode->num_batches */ /* - * GPDB hash tables was modified to be rescannable even in case of hash - * table overflows to disk, and an ancestor node requests rescan - * (e.g. because the HJ is in the inner subtree of a NJ). - * See the SpillCurrentBatch comment for details + * If hash table overflows to disk, and an ancestor node requests rescan + * (e.g. because the HJ is in the inner subtree of a NJ), then the HJ has + * to be redone, including rescanning the inner rel in order to rebuild + * the hash table. */ - pathnode->jpath.path.rescannable = outer_path->rescannable; + pathnode->jpath.path.rescannable = outer_path->rescannable && inner_path->rescannable; /* see the comment above; we may have a motion hazard on our inner ?! */ if (pathnode->jpath.path.rescannable) diff --git a/src/test/regress/expected/gporca.out b/src/test/regress/expected/gporca.out index 94c46257bb5f..496625bbcc35 100644 --- a/src/test/regress/expected/gporca.out +++ b/src/test/regress/expected/gporca.out @@ -12588,7 +12588,7 @@ where out.b in (select coalesce(tcorr2_d.c, 99) group by a) tcorr2_d on tcorr1.a=tcorr2_d.a); QUERY PLAN ---------------------------------------------------------------------------------------------------------------------- - Nested Loop Semi Join (cost=10000000001.07..10000000003.19 rows=4 width=8) + Nested Loop Semi Join (cost=10000000001.07..10000000003.21 rows=4 width=8) Join Filter: ("out".b = COALESCE((count(*)), '99'::bigint)) -> Gather Motion 3:1 (slice1; segments: 3) (cost=0.00..1.03 rows=1 width=8) -> Seq Scan on tcorr1 "out" (cost=0.00..1.01 rows=1 width=8) @@ -12705,7 +12705,7 @@ where out.b in (select coalesce(tcorr2_d.c, 99) group by a) tcorr2_d on tcorr1.a=tcorr2_d.a); QUERY PLAN ---------------------------------------------------------------------------------------------------------------------- - Nested Loop Semi Join (cost=10000000001.07..10000000003.19 rows=4 width=8) + Nested Loop Semi Join (cost=10000000001.07..10000000003.21 rows=4 width=8) Join Filter: ("out".b = COALESCE((count(*)), '99'::bigint)) -> Gather Motion 3:1 (slice1; segments: 3) (cost=0.00..1.03 rows=1 width=8) -> Seq Scan on tcorr1 "out" (cost=0.00..1.01 rows=1 width=8)