Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,26 @@ public void testBucketFilter()
}
}

// Physical Nodes cannot be tested here because we need the hive queryrunner
@Test
public void testSimpleCteNodes()
throws Exception
{

}

@Test
public void testComplexCteNodes()
{

}

@Test
public void testSimpleCteMaterialization()
{

}

private Session pushdownFilterEnabled()
{
return Session.builder(getQueryRunner().getDefaultSession())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

import java.util.List;

import static com.facebook.presto.SystemSessionProperties.CTE_MATERIALIZATION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PROVIDER_CATALOG;
import static com.facebook.presto.SystemSessionProperties.RESTRICT_HISTORY_BASED_OPTIMIZATION_TO_COMPLEX_QUERY;
import static com.facebook.presto.SystemSessionProperties.USE_HISTORY_BASED_PLAN_STATISTICS;
import static com.facebook.presto.SystemSessionProperties.USE_PERFECTLY_CONSISTENT_HISTORIES;
Expand All @@ -49,8 +51,12 @@
import static com.facebook.presto.sql.planner.CanonicalPlanGenerator.generateCanonicalPlan;
import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS;
import static com.google.common.graph.Traverser.forTree;
import static io.airlift.tpch.TpchTable.CUSTOMER;
import static io.airlift.tpch.TpchTable.LINE_ITEM;
import static io.airlift.tpch.TpchTable.NATION;
import static io.airlift.tpch.TpchTable.ORDERS;
import static io.airlift.tpch.TpchTable.PART;
import static io.airlift.tpch.TpchTable.REGION;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
Expand All @@ -62,7 +68,7 @@ public class TestHiveCanonicalPlanHashes
protected QueryRunner createQueryRunner()
throws Exception
{
QueryRunner queryRunner = HiveQueryRunner.createQueryRunner(ImmutableList.of(ORDERS, LINE_ITEM));
QueryRunner queryRunner = HiveQueryRunner.createQueryRunner(ImmutableList.of(ORDERS, LINE_ITEM, CUSTOMER, NATION, REGION, PART));
queryRunner.installPlugin(new Plugin()
{
@Override
Expand Down Expand Up @@ -136,6 +142,70 @@ public void testCanonicalizationStrategies()
}
}

// Tests for CTE materialization
@Test

public void testCteMaterialization()
throws Exception
{
QueryRunner queryRunner = getQueryRunner();

String query1 = "WITH temp as (SELECT orderkey FROM orders) " +
"SELECT * FROM temp t1 ";
assertSamePlanHash(
query1,
createMaterializedSession(),
query1,
createMaterializedSession(),
CONNECTOR);

assertSamePlanHash(
"WITH cte1 AS (SELECT orderkey FROM orders WHERE orderkey < 100), " +
" cte2 AS (SELECT custkey FROM customer WHERE custkey < 50) " +
"SELECT * FROM cte1, cte2 WHERE cte1.orderkey = cte2.custkey",
createMaterializedSession(),
"WITH cte2 AS (SELECT custkey FROM customer WHERE custkey < 50), " +
" cte1 AS (SELECT orderkey FROM orders WHERE orderkey < 100) " +
"SELECT * FROM cte1, cte2 WHERE cte1.orderkey = cte2.custkey",
createMaterializedSession(),
CONNECTOR);

assertDifferentPlanHash(
"WITH cte1 AS (SELECT orderkey FROM orders WHERE orderkey < 100), " +
" cte2 AS (SELECT custkey FROM customer WHERE custkey < 50) " +
"SELECT * FROM cte1, cte2 WHERE cte1.orderkey = cte2.custkey",
createMaterializedSession(),
"WITH cte2 AS (SELECT orderkey FROM orders WHERE orderkey < 100), " +
" cte1 AS (SELECT custkey FROM customer WHERE custkey < 50) " +
"SELECT * FROM cte1, cte2 WHERE cte2.orderkey = cte1.custkey",
createMaterializedSession(),
CONNECTOR);

String complexQuery = "WITH customer_nation AS (" +
" SELECT c.custkey, c.name, n.name AS nation_name, r.name AS region_name " +
" FROM CUSTOMER c " +
" JOIN NATION n ON c.nationkey = n.nationkey " +
" JOIN REGION r ON n.regionkey = r.regionkey), " +
" customer_orders AS (" +
" SELECT co.custkey, co.name, co.nation_name, co.region_name, o.orderkey, o.orderdate " +
" FROM customer_nation co " +
" JOIN ORDERS o ON co.custkey = o.custkey), " +
"order_lineitems AS (" +
" SELECT co.*, l.partkey, l.quantity, l.extendedprice " +
" FROM customer_orders co " +
" JOIN lineitem l ON co.orderkey = l.orderkey), " +
" customer_part_analysis AS (" +
" SELECT ol.*, p.name AS part_name, p.type AS part_type " +
" FROM order_lineitems ol " +
" JOIN PART p ON ol.partkey = p.partkey) " +
"SELECT * FROM customer_part_analysis " +
"WHERE region_name = 'AMERICA' " +
"ORDER BY nation_name, custkey, orderdate";
assertSamePlanHash(complexQuery, createMaterializedSession(),
complexQuery, createMaterializedSession(),
CONNECTOR);
}

@Test
public void testStatsEquivalentNodeMarking()
{
Expand All @@ -159,6 +229,14 @@ public void testStatsEquivalentNodeMarking()
}
}

private void assertSamePlanHash(String sql1, Session session1, String sql2, Session session2, PlanCanonicalizationStrategy strategy)
throws Exception
{
String hashes1 = getPlanHash(sql1, session1, strategy);
String hashes2 = getPlanHash(sql2, session2, strategy);
assertEquals(hashes1, hashes2);
}

private void assertSamePlanHash(String sql1, String sql2, PlanCanonicalizationStrategy strategy)
throws Exception
{
Expand All @@ -167,6 +245,14 @@ private void assertSamePlanHash(String sql1, String sql2, PlanCanonicalizationSt
assertEquals(hashes1, hashes2);
}

private void assertDifferentPlanHash(String sql1, Session session1, String sql2, Session session2, PlanCanonicalizationStrategy strategy)
throws Exception
{
String hashes1 = getPlanHash(sql1, session1, strategy);
String hashes2 = getPlanHash(sql2, session2, strategy);
assertNotEquals(hashes1, hashes2);
}

private void assertDifferentPlanHash(String sql1, String sql2, PlanCanonicalizationStrategy strategy)
throws Exception
{
Expand All @@ -175,6 +261,15 @@ private void assertDifferentPlanHash(String sql1, String sql2, PlanCanonicalizat
assertNotEquals(hashes1, hashes2);
}

private String getPlanHash(String sql, Session session, PlanCanonicalizationStrategy strategy)
throws Exception
{
PlanNode plan = plan(sql, session).getRoot();
ObjectMapper objectMapper = createObjectMapper();
assertTrue(plan.getStatsEquivalentPlanNode().isPresent());
return objectMapper.writeValueAsString(generateCanonicalPlan(plan.getStatsEquivalentPlanNode().get(), strategy, objectMapper, session).get());
}

private String getPlanHash(String sql, PlanCanonicalizationStrategy strategy)
throws Exception
{
Expand Down Expand Up @@ -207,4 +302,12 @@ private Session createSession()
.setSystemProperty(RESTRICT_HISTORY_BASED_OPTIMIZATION_TO_COMPLEX_QUERY, "false")
.build();
}

public Session createMaterializedSession()
{
return Session.builder(createSession())
.setSystemProperty(CTE_MATERIALIZATION_STRATEGY, "ALL")
.setSystemProperty(PARTITIONING_PROVIDER_CATALOG, "hive")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;

import static com.facebook.presto.SystemSessionProperties.CTE_MATERIALIZATION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PROVIDER_CATALOG;
import static com.facebook.presto.SystemSessionProperties.RESTRICT_HISTORY_BASED_OPTIMIZATION_TO_COMPLEX_QUERY;
import static com.facebook.presto.SystemSessionProperties.TRACK_HISTORY_BASED_PLAN_STATISTICS;
import static com.facebook.presto.SystemSessionProperties.USE_HISTORY_BASED_PLAN_STATISTICS;
Expand Down Expand Up @@ -151,19 +153,93 @@ public void testBroadcastJoin()
}
}

@Test
public void testSimpleCteMaterialization()
{
String query = "WITH temp as (SELECT orderkey FROM orders) " +
"SELECT * FROM temp t1 ";
// assertPlan(
// "SELECT O.totalprice, C.name FROM orders O JOIN (SELECT name, custkey FROM customer UNION ALL SELECT * FROM (VALUES ('unknown', NULL)) t(name, custkey)) C ON C.custkey = O.custkey AND YEAR(O.orderdate) = 1995",
// anyTree(node(JoinNode.class, anyTree(any()), anyTree(any())).withOutputRowCount(2204).withJoinStatistics(1501, 1, 2204, 0)));

// CBO stats
// CBO Statistics
executeAndTrackHistory(query, createMaterializedSession());

// CBO stats
executeAndTrackHistory(query, createMaterializedSession());
// assertSamePlanHash(
// query1,
// getMaterializedSession(),
// query1,
// getMaterializedSession(),
// CONNECTOR);
}

@Test
public void testSimpleCteMaterialization1()
{
String query = "WITH temp as (SELECT orderkey FROM orders) " +
"SELECT * FROM temp t1 ";
// assertPlan(
// "SELECT O.totalprice, C.name FROM orders O JOIN (SELECT name, custkey FROM customer UNION ALL SELECT * FROM (VALUES ('unknown', NULL)) t(name, custkey)) C ON C.custkey = O.custkey AND YEAR(O.orderdate) = 1995",
// anyTree(node(JoinNode.class, anyTree(any()), anyTree(any())).withOutputRowCount(2204).withJoinStatistics(1501, 1, 2204, 0)));

// CBO stats
// CBO Statistics
executeAndTrackHistory(query, createCBOMaterializedSession());

// CBO stats
executeAndTrackHistory(query, createCBOMaterializedSession());
// assertSamePlanHash(
// query1,
// getMaterializedSession(),
// query1,
// getMaterializedSession(),
// CONNECTOR);
}

@Test
public void testSimpleCteMaterialization2()
{
String query = "WITH temp as (SELECT count(*) FROM orders) " +
"SELECT * FROM temp t1 UNION SELECT * FROM temp t1 ";
// assertPlan(
// "SELECT O.totalprice, C.name FROM orders O JOIN (SELECT name, custkey FROM customer UNION ALL SELECT * FROM (VALUES ('unknown', NULL)) t(name, custkey)) C ON C.custkey = O.custkey AND YEAR(O.orderdate) = 1995",
// anyTree(node(JoinNode.class, anyTree(any()), anyTree(any())).withOutputRowCount(2204).withJoinStatistics(1501, 1, 2204, 0)));

// CBO stats
// CBO Statistics
executeAndTrackHistory(query, createCBOMaterializedSession());

// CBO stats
executeAndTrackHistory(query, createCBOMaterializedSession());
// assertSamePlanHash(
// query1,
// getMaterializedSession(),
// query1,
// getMaterializedSession(),
// CONNECTOR);
}

@Override
protected void assertPlan(@Language("SQL") String query, PlanMatchPattern pattern)
{
assertPlan(createSession(), query, pattern);
}

private void executeAndTrackHistory(String sql)
{
executeAndTrackHistory(sql, createSession());
}

private void executeAndTrackHistory(String sql, Session session)
{
DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner();
SqlQueryManager sqlQueryManager = (SqlQueryManager) queryRunner.getCoordinator().getQueryManager();
InMemoryHistoryBasedPlanStatisticsProvider provider = (InMemoryHistoryBasedPlanStatisticsProvider) sqlQueryManager.getHistoryBasedPlanStatisticsTracker().getHistoryBasedPlanStatisticsProvider();

queryRunner.execute(createSession(), sql);
queryRunner.execute(session, sql);
provider.waitProcessQueryEvents();
}

Expand All @@ -177,4 +253,20 @@ private Session createSession()
.setSystemProperty(RESTRICT_HISTORY_BASED_OPTIMIZATION_TO_COMPLEX_QUERY, "false")
.build();
}

public Session createMaterializedSession()
{
return Session.builder(createSession())
.setSystemProperty(CTE_MATERIALIZATION_STRATEGY, "ALL")
.setSystemProperty(PARTITIONING_PROVIDER_CATALOG, "hive")
.build();
}

public Session createCBOMaterializedSession()
{
return Session.builder(createSession())
.setSystemProperty(CTE_MATERIALIZATION_STRATEGY, "COST_BASED")
.setSystemProperty(PARTITIONING_PROVIDER_CATALOG, "hive")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationIfToFilterRewriteStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.CteMaterializationStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinNotNullInferenceStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy;
Expand Down Expand Up @@ -184,6 +185,7 @@ public final class SystemSessionProperties
public static final String MAX_DRIVERS_PER_TASK = "max_drivers_per_task";
public static final String MAX_TASKS_PER_STAGE = "max_tasks_per_stage";
public static final String DEFAULT_FILTER_FACTOR_ENABLED = "default_filter_factor_enabled";
public static final String CTE_MATERIALIZATION_STRATEGY = "cte_materialization_strategy";
public static final String DEFAULT_JOIN_SELECTIVITY_COEFFICIENT = "default_join_selectivity_coefficient";
public static final String PUSH_LIMIT_THROUGH_OUTER_JOIN = "push_limit_through_outer_join";
public static final String OPTIMIZE_CONSTANT_GROUPING_KEYS = "optimize_constant_grouping_keys";
Expand Down Expand Up @@ -1023,6 +1025,18 @@ public SystemSessionProperties(
"use a default filter factor for unknown filters in a filter node",
featuresConfig.isDefaultFilterFactorEnabled(),
false),
new PropertyMetadata<>(
CTE_MATERIALIZATION_STRATEGY,
format("The strategy to materialize common table expressions. Options are %s",
Stream.of(CteMaterializationStrategy.values())
.map(CteMaterializationStrategy::name)
.collect(joining(","))),
VARCHAR,
CteMaterializationStrategy.class,
featuresConfig.getCteMaterializationStrategy(),
false,
value -> CteMaterializationStrategy.valueOf(((String) value).toUpperCase()),
CteMaterializationStrategy::name),
new PropertyMetadata<>(
DEFAULT_JOIN_SELECTIVITY_COEFFICIENT,
"use a default join selectivity coefficient factor when column statistics are not available in a join node",
Expand Down Expand Up @@ -2280,6 +2294,11 @@ public static DataSize getFilterAndProjectMinOutputPageSize(Session session)
return session.getSystemProperty(FILTER_AND_PROJECT_MIN_OUTPUT_PAGE_SIZE, DataSize.class);
}

public static CteMaterializationStrategy getCteMaterializationStrategy(Session session)
{
return session.getSystemProperty(CTE_MATERIALIZATION_STRATEGY, CteMaterializationStrategy.class);
}

public static int getFilterAndProjectMinOutputPageRowCount(Session session)
{
return session.getSystemProperty(FILTER_AND_PROJECT_MIN_OUTPUT_PAGE_ROW_COUNT, Integer.class);
Expand Down
Loading