From 1e070b2e42ff0c3a370bf4f410299786544a26c0 Mon Sep 17 00:00:00 2001 From: Leiqing Cai Date: Tue, 3 Mar 2020 12:54:53 -0800 Subject: [PATCH 1/8] Improve Presto results conversion - Fix the failure when column is null. - Fix incorrect index in default result set converter. - Allow result rows to be ignored. --- .../presto/verifier/checksum/ChecksumResult.java | 5 +++-- .../verifier/framework/DataVerificationUtil.java | 2 +- .../framework/LimitQueryDeterminismAnalyzer.java | 2 +- .../verifier/prestoaction/JdbcPrestoAction.java | 2 +- .../presto/verifier/prestoaction/PrestoAction.java | 13 ++++++++----- .../TooManyOpenPartitionsFailureResolver.java | 2 +- .../verifier/prestoaction/TestJdbcPrestoAction.java | 2 +- 7 files changed, 16 insertions(+), 12 deletions(-) diff --git a/presto-verifier/src/main/java/com/facebook/presto/verifier/checksum/ChecksumResult.java b/presto-verifier/src/main/java/com/facebook/presto/verifier/checksum/ChecksumResult.java index ca843cc585b7f..95473f370c56e 100644 --- a/presto-verifier/src/main/java/com/facebook/presto/verifier/checksum/ChecksumResult.java +++ b/presto-verifier/src/main/java/com/facebook/presto/verifier/checksum/ChecksumResult.java @@ -21,6 +21,7 @@ import java.sql.Types; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -55,7 +56,7 @@ public Object getChecksum(String columnName) return checksums.get(columnName); } - public static ChecksumResult fromResultSet(ResultSet resultSet) + public static Optional fromResultSet(ResultSet resultSet) throws SQLException { long rowCount = resultSet.getLong(1); @@ -83,6 +84,6 @@ else if (metaData.getColumnType(i) == Types.BIGINT) { checksums.put(columnName, new SqlVarbinary((byte[]) checksum)); } } - return new ChecksumResult(rowCount, checksums); + return Optional.of(new ChecksumResult(rowCount, checksums)); } } diff --git a/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/DataVerificationUtil.java b/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/DataVerificationUtil.java index 187a0dc7faebd..68df2b6ef8009 100644 --- a/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/DataVerificationUtil.java +++ b/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/DataVerificationUtil.java @@ -83,7 +83,7 @@ public static void teardownSafely(PrestoAction prestoAction, @Nullable QueryBund public static List getColumns(PrestoAction prestoAction, TypeManager typeManager, QualifiedName tableName) { return prestoAction - .execute(new ShowColumns(tableName), DESCRIBE, resultSet -> Column.fromResultSet(typeManager, resultSet)) + .execute(new ShowColumns(tableName), DESCRIBE, resultSet -> Optional.of(Column.fromResultSet(typeManager, resultSet))) .getResults(); } diff --git a/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/LimitQueryDeterminismAnalyzer.java b/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/LimitQueryDeterminismAnalyzer.java index 5170f3dcc0e27..17c53cbf69fde 100644 --- a/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/LimitQueryDeterminismAnalyzer.java +++ b/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/LimitQueryDeterminismAnalyzer.java @@ -162,7 +162,7 @@ private LimitQueryDeterminismAnalysis analyzeLimitNoOrderBy(Query newLimitQuery, new TableSubquery(newLimitQuery)); QueryResult result = callWithQueryStatsConsumer( - () -> prestoAction.execute(rowCountQuery, DETERMINISM_ANALYSIS, resultSet -> resultSet.getLong(1)), + () -> prestoAction.execute(rowCountQuery, DETERMINISM_ANALYSIS, resultSet -> Optional.of(resultSet.getLong(1))), stats -> verificationContext.setLimitQueryAnalysisQueryId(stats.getQueryId())); long rowCountHigherLimit = getOnlyElement(result.getResults()); diff --git a/presto-verifier/src/main/java/com/facebook/presto/verifier/prestoaction/JdbcPrestoAction.java b/presto-verifier/src/main/java/com/facebook/presto/verifier/prestoaction/JdbcPrestoAction.java index 649d7f5ddce03..507c1f91a7eb8 100644 --- a/presto-verifier/src/main/java/com/facebook/presto/verifier/prestoaction/JdbcPrestoAction.java +++ b/presto-verifier/src/main/java/com/facebook/presto/verifier/prestoaction/JdbcPrestoAction.java @@ -128,7 +128,7 @@ private QueryResult executeOnce(Statement statement, QueryStage queryStag columnNames.add(resultSet.getMetaData().getColumnName(i)); } while (resultSet.next()) { - rows.add(converter.get().apply(resultSet)); + converter.get().apply(resultSet).ifPresent(rows::add); } } } diff --git a/presto-verifier/src/main/java/com/facebook/presto/verifier/prestoaction/PrestoAction.java b/presto-verifier/src/main/java/com/facebook/presto/verifier/prestoaction/PrestoAction.java index 2e7dd33b6ed5f..622846f120dd6 100644 --- a/presto-verifier/src/main/java/com/facebook/presto/verifier/prestoaction/PrestoAction.java +++ b/presto-verifier/src/main/java/com/facebook/presto/verifier/prestoaction/PrestoAction.java @@ -17,26 +17,29 @@ import com.facebook.presto.sql.tree.Statement; import com.facebook.presto.verifier.framework.QueryResult; import com.facebook.presto.verifier.framework.QueryStage; -import com.google.common.collect.ImmutableList; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; +import java.util.Optional; + +import static java.util.Collections.unmodifiableList; public interface PrestoAction { @FunctionalInterface interface ResultSetConverter { - R apply(ResultSet resultSet) + Optional apply(ResultSet resultSet) throws SQLException; ResultSetConverter> DEFAULT = resultSet -> { - ImmutableList.Builder row = ImmutableList.builder(); - for (int i = 0; i < resultSet.getMetaData().getColumnCount(); i++) { + List row = new ArrayList<>(); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { row.add(resultSet.getObject(i)); } - return row.build(); + return Optional.of(unmodifiableList(row)); }; } diff --git a/presto-verifier/src/main/java/com/facebook/presto/verifier/resolver/TooManyOpenPartitionsFailureResolver.java b/presto-verifier/src/main/java/com/facebook/presto/verifier/resolver/TooManyOpenPartitionsFailureResolver.java index 61d9b41e132bd..6c5980c825d88 100644 --- a/presto-verifier/src/main/java/com/facebook/presto/verifier/resolver/TooManyOpenPartitionsFailureResolver.java +++ b/presto-verifier/src/main/java/com/facebook/presto/verifier/resolver/TooManyOpenPartitionsFailureResolver.java @@ -81,7 +81,7 @@ public Optional resolve(QueryStats controlQueryStats, QueryException que e -> { try { ShowCreate showCreate = new ShowCreate(TABLE, test.get().getTableName()); - String showCreateResult = getOnlyElement(prestoAction.execute(showCreate, DESCRIBE, resultSet -> resultSet.getString(1)).getResults()); + String showCreateResult = getOnlyElement(prestoAction.execute(showCreate, DESCRIBE, resultSet -> Optional.of(resultSet.getString(1))).getResults()); CreateTable createTable = (CreateTable) sqlParser.createStatement(showCreateResult, ParsingOptions.builder().setDecimalLiteralTreatment(AS_DOUBLE).build()); List bucketCountProperty = createTable.getProperties().stream() .filter(property -> property.getName().getValue().equals(BUCKET_COUNT_PROPERTY)) diff --git a/presto-verifier/src/test/java/com/facebook/presto/verifier/prestoaction/TestJdbcPrestoAction.java b/presto-verifier/src/test/java/com/facebook/presto/verifier/prestoaction/TestJdbcPrestoAction.java index e497affe060a1..3e58367c92579 100644 --- a/presto-verifier/src/test/java/com/facebook/presto/verifier/prestoaction/TestJdbcPrestoAction.java +++ b/presto-verifier/src/test/java/com/facebook/presto/verifier/prestoaction/TestJdbcPrestoAction.java @@ -105,7 +105,7 @@ public void testQuerySucceededWithConverter() QueryResult result = prestoAction.execute( sqlParser.createStatement("SELECT x FROM (VALUES (1), (2), (3)) t(x)", PARSING_OPTIONS), QUERY_STAGE, - resultSet -> resultSet.getInt("x") * resultSet.getInt("x")); + resultSet -> Optional.of(resultSet.getInt("x") * resultSet.getInt("x"))); assertEquals(result.getQueryStats().getState(), FINISHED.name()); assertEquals(result.getResults(), ImmutableList.of(1, 4, 9)); } From 7aed00893642e967c0cc9d310ab31e83a6d0efa4 Mon Sep 17 00:00:00 2001 From: Leiqing Cai Date: Tue, 3 Mar 2020 12:54:53 -0800 Subject: [PATCH 2/8] Support determinism analysis for simple ORDER BY LIMIT queries Support determinism analysis for simple queries with top-level ORDER BY LIMIT clause. Select, Insert, and CreateTableAsSelect are supported, but only when the query part is a Select query, not a SetOperation( i.e., Union, Intersect, and Except). "INSERT INTO ...SELECT ... ORDER BY ... LIMIT ..." is supported. "INSERT INTO ... SELECT ... UNION ALL ... SELECT ... ORDER BY ... LIMIT ..." is not supported. To check for determinism of ORDER BY LIMIT queries, we run the Select query with limit N + 1, project all the necessary ORDER BY columns, and check whether there is a tie on the ORDER BY columns for the n-th row and the (n+1)-th row. --- .../LimitQueryDeterminismAnalyzer.java | 173 +++++++++++++++++- .../verifier/framework/QueryResult.java | 2 +- .../TestLimitQueryDeterminismAnalyzer.java | 83 ++++++++- 3 files changed, 248 insertions(+), 10 deletions(-) diff --git a/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/LimitQueryDeterminismAnalyzer.java b/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/LimitQueryDeterminismAnalyzer.java index 17c53cbf69fde..7ab9b19832b3b 100644 --- a/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/LimitQueryDeterminismAnalyzer.java +++ b/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/LimitQueryDeterminismAnalyzer.java @@ -14,13 +14,17 @@ package com.facebook.presto.verifier.framework; import com.facebook.presto.sql.tree.CreateTableAsSelect; +import com.facebook.presto.sql.tree.Expression; import com.facebook.presto.sql.tree.FunctionCall; +import com.facebook.presto.sql.tree.Identifier; import com.facebook.presto.sql.tree.Insert; import com.facebook.presto.sql.tree.LongLiteral; +import com.facebook.presto.sql.tree.OrderBy; import com.facebook.presto.sql.tree.QualifiedName; import com.facebook.presto.sql.tree.Query; import com.facebook.presto.sql.tree.QuerySpecification; import com.facebook.presto.sql.tree.Select; +import com.facebook.presto.sql.tree.SelectItem; import com.facebook.presto.sql.tree.SingleColumn; import com.facebook.presto.sql.tree.Statement; import com.facebook.presto.sql.tree.TableSubquery; @@ -28,7 +32,15 @@ import com.facebook.presto.verifier.prestoaction.PrestoAction; import com.google.common.collect.ImmutableList; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Set; import static com.facebook.presto.sql.QueryUtil.simpleQuery; import static com.facebook.presto.verifier.framework.LimitQueryDeterminismAnalysis.DETERMINISTIC; @@ -37,9 +49,15 @@ import static com.facebook.presto.verifier.framework.LimitQueryDeterminismAnalysis.NOT_RUN; import static com.facebook.presto.verifier.framework.QueryStage.DETERMINISM_ANALYSIS; import static com.facebook.presto.verifier.framework.VerifierUtil.callWithQueryStatsConsumer; +import static com.facebook.presto.verifier.framework.VerifierUtil.delimitedIdentifier; +import static com.facebook.presto.verifier.prestoaction.PrestoAction.ResultSetConverter; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getOnlyElement; import static java.lang.Long.parseLong; +import static java.lang.Math.toIntExact; +import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -130,9 +148,53 @@ private LimitQueryDeterminismAnalysis analyzeQuery(Query query) return analyzeLimitNoOrderBy(newLimitQuery, limit); } + /** + * To check whether all ORDER BY columns are matching between the n-th and the (n+1)-th row, we + * may need to project additional columns. Takes in the list of SelectItems of the original query + * and append additional SelectItems to the list. + * + * @param selectItems A list of {@link SelectItem} of the original query + * @param orderBy ORDER BY clause + * @return the list of column keys to locate ORDER BY columns in the query result + */ + private List populateSelectItems(List selectItems, OrderBy orderBy) + { + Set aliases = selectItems.stream() + .filter(SingleColumn.class::isInstance) + .map(SingleColumn.class::cast) + .map(SingleColumn::getAlias) + .filter(Optional::isPresent) + .map(Optional::get) + .map(Identifier::getValue) + .collect(toImmutableSet()); + ImmutableList.Builder orderByKeys = ImmutableList.builder(); + + for (int i = 0; i < orderBy.getSortItems().size(); i++) { + Expression sortKey = orderBy.getSortItems().get(i).getSortKey(); + if (sortKey instanceof LongLiteral) { + // If sortKey is an long literal, it can be referenced by column index. + orderByKeys.add(ColumnNameOrIndex.forIndex(toIntExact(((LongLiteral) sortKey).getValue()) - 1)); + } + else if (sortKey instanceof Identifier && aliases.contains(((Identifier) sortKey).getValue())) { + // If sortKey is an identifier, it can either be an alias or a column name. + // It is impossible for two columns to have the same alias as sortKey, since otherwise a SYNTAX_ERROR will be thrown due to sortKey being ambiguous. + // It is possible that sortKey is both an alias and a column name. In that case, sortKey references the aliased column. + orderByKeys.add(ColumnNameOrIndex.forName(((Identifier) sortKey).getValue())); + } + else { + // If the sortKey is non-alias identifier, select the sortKey column, since it might not be selected or it might be aliased. + // If the sortKey is not an identifier, select the sortKey column. + String columnName = "$$sort_key$$" + i; + selectItems.add(new SingleColumn(sortKey, delimitedIdentifier(columnName))); + orderByKeys.add(ColumnNameOrIndex.forName(columnName)); + } + } + return orderByKeys.build(); + } + private LimitQueryDeterminismAnalysis analyzeQuerySpecification(Optional with, QuerySpecification querySpecification) { - if (querySpecification.getOrderBy().isPresent() || !querySpecification.getLimit().isPresent()) { + if (!querySpecification.getLimit().isPresent()) { return NOT_RUN; } if (isLimitAll(querySpecification.getLimit().get())) { @@ -140,6 +202,27 @@ private LimitQueryDeterminismAnalysis analyzeQuerySpecification(Optional w } long limit = parseLong(querySpecification.getLimit().get()); Optional newLimit = Optional.of(Long.toString(limit + 1)); + Optional orderBy = querySpecification.getOrderBy(); + + if (orderBy.isPresent()) { + List selectItems = new ArrayList<>(querySpecification.getSelect().getSelectItems()); + List orderByKeys = populateSelectItems(selectItems, orderBy.get()); + return analyzeLimitOrderBy( + new Query( + with, + new QuerySpecification( + new Select(false, selectItems), + querySpecification.getFrom(), + querySpecification.getWhere(), + querySpecification.getGroupBy(), + querySpecification.getHaving(), + orderBy, + newLimit), + Optional.empty(), + Optional.empty()), + orderByKeys, + limit); + } Query newLimitQuery = new Query( with, new QuerySpecification( @@ -175,8 +258,96 @@ private LimitQueryDeterminismAnalysis analyzeLimitNoOrderBy(Query newLimitQuery, return FAILED_DATA_CHANGED; } + private LimitQueryDeterminismAnalysis analyzeLimitOrderBy(Query tieInspectorQuery, List orderByKeys, long limit) + { + QueryResult> result = callWithQueryStatsConsumer( + () -> prestoAction.execute(tieInspectorQuery, DETERMINISM_ANALYSIS, new TieInspector(limit)), + stats -> verificationContext.setLimitQueryAnalysisQueryId(stats.getQueryId())); + if (result.getResults().isEmpty()) { + return FAILED_DATA_CHANGED; + } + if (result.getResults().size() == 1) { + return DETERMINISTIC; + } + + List row1 = result.getResults().get(0); + List row2 = result.getResults().get(1); + checkState(row1.size() == row2.size(), "Rows have different sizes: %s %s", row1.size(), row2.size()); + + List columnNames = result.getColumnNames(); + Map columnIndices = new HashMap<>(); + for (int i = 0; i < columnNames.size(); i++) { + columnIndices.putIfAbsent(columnNames.get(i), i); + } + + for (ColumnNameOrIndex orderByKey : orderByKeys) { + int columnIndex = orderByKey.getIndex().isPresent() + ? orderByKey.getIndex().get() + : columnIndices.get(orderByKey.getName().orElseThrow(() -> new IllegalArgumentException(format("Invalid orderByKey: %s", orderByKey)))); + if (!Objects.equals(row1.get(columnIndex), row2.get(columnIndex))) { + return DETERMINISTIC; + } + } + return NON_DETERMINISTIC; + } + private static boolean isLimitAll(String limitClause) { return limitClause.toLowerCase(ENGLISH).equals("all"); } + + private static class ColumnNameOrIndex + { + private final Optional name; + private final Optional index; + + private ColumnNameOrIndex(Optional name, Optional index) + { + this.name = requireNonNull(name, "name is null"); + this.index = requireNonNull(index, "index is null"); + checkState(this.name.isPresent() ^ this.index.isPresent(), "Exactly one of name and index must be present: %s %s", this.name, this.index); + } + + public static ColumnNameOrIndex forName(String name) + { + return new ColumnNameOrIndex(Optional.of(name), Optional.empty()); + } + + public static ColumnNameOrIndex forIndex(int index) + { + return new ColumnNameOrIndex(Optional.empty(), Optional.of(index)); + } + + public Optional getName() + { + return name; + } + + public Optional getIndex() + { + return index; + } + } + + private static class TieInspector + implements ResultSetConverter> + { + private final long limit; + private long row; + + public TieInspector(long limit) + { + this.limit = limit; + } + + public Optional> apply(ResultSet resultSet) + throws SQLException + { + row++; + if (row != limit && row != limit + 1) { + return Optional.empty(); + } + return ResultSetConverter.DEFAULT.apply(resultSet); + } + } } diff --git a/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/QueryResult.java b/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/QueryResult.java index 6b3436beda5ff..e1b6af58dd754 100644 --- a/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/QueryResult.java +++ b/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/QueryResult.java @@ -29,7 +29,7 @@ public class QueryResult public QueryResult(List results, List columnNames, QueryStats queryStats) { this.results = ImmutableList.copyOf(results); - this.columnNames = requireNonNull(columnNames, "columnNames is null"); + this.columnNames = ImmutableList.copyOf(columnNames); this.queryStats = requireNonNull(queryStats, "queryStats is null"); } diff --git a/presto-verifier/src/test/java/com/facebook/presto/verifier/framework/TestLimitQueryDeterminismAnalyzer.java b/presto-verifier/src/test/java/com/facebook/presto/verifier/framework/TestLimitQueryDeterminismAnalyzer.java index 731fdccea26bc..ab3ddbe63c35d 100644 --- a/presto-verifier/src/test/java/com/facebook/presto/verifier/framework/TestLimitQueryDeterminismAnalyzer.java +++ b/presto-verifier/src/test/java/com/facebook/presto/verifier/framework/TestLimitQueryDeterminismAnalyzer.java @@ -22,8 +22,8 @@ import com.google.common.collect.ImmutableList; import org.testng.annotations.Test; +import java.util.List; import java.util.Optional; -import java.util.concurrent.atomic.AtomicLong; import static com.facebook.presto.sql.SqlFormatter.formatSql; import static com.facebook.presto.sql.parser.IdentifierSymbol.AT_SIGN; @@ -33,22 +33,24 @@ import static com.facebook.presto.verifier.framework.LimitQueryDeterminismAnalysis.FAILED_DATA_CHANGED; import static com.facebook.presto.verifier.framework.LimitQueryDeterminismAnalysis.NON_DETERMINISTIC; import static com.facebook.presto.verifier.framework.LimitQueryDeterminismAnalysis.NOT_RUN; -import static java.util.Objects.requireNonNull; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +@Test(singleThreaded = true) public class TestLimitQueryDeterminismAnalyzer { private static class MockPrestoAction implements PrestoAction { - private final AtomicLong rowCount; + private final List rows; + private final List columnNames; private Statement lastStatement; - public MockPrestoAction(AtomicLong rowCount) + public MockPrestoAction(List rows, List columnNames) { - this.rowCount = requireNonNull(rowCount, "rowCount is null"); + this.rows = ImmutableList.copyOf(rows); + this.columnNames = ImmutableList.copyOf(columnNames); } @Override @@ -62,7 +64,7 @@ public QueryStats execute(Statement statement, QueryStage queryStage) public QueryResult execute(Statement statement, QueryStage queryStage, ResultSetConverter converter) { lastStatement = statement; - return new QueryResult(ImmutableList.of(rowCount.get()), ImmutableList.of("_col0"), QUERY_STATS); + return new QueryResult(rows, columnNames, QUERY_STATS); } public Statement getLastStatement() @@ -76,6 +78,22 @@ public Statement getLastStatement() private static final ParsingOptions PARSING_OPTIONS = ParsingOptions.builder().setDecimalLiteralTreatment(AS_DOUBLE).build(); private static final SqlParser sqlParser = new SqlParser(new SqlParserOptions().allowIdentifierSymbol(COLON, AT_SIGN)); + private static final String ORDER_BY_LIMIT_QUERY = "INSERT INTO test\n" + + "SELECT\n" + + " a b,\n" + + " c,\n" + + " *,\n" + + " e\n" + + "FROM source\n" + + "ORDER BY\n" + + " a,\n" + + " b,\n" + + " 2 DESC,\n" + + " c + d DESC\n" + + "LIMIT\n" + + " 1000"; + private static final List TIE_INSPECTOR_COLUMNS = ImmutableList.of("b", "c", "e", "x", "y", "$$sort_key$$0", "$$sort_key$$3"); + @Test public void testNotRunLimitNoOrderBy() { @@ -87,7 +105,6 @@ public void testNotRunLimitNoOrderBy() // ORDER BY clause assertAnalysis(prestoAction, "INSERT INTO test SELECT * FROM source UNION ALL SELECT * FROM source ORDER BY 1 LIMIT 1000", NOT_RUN); - assertAnalysis(prestoAction, "INSERT INTO test SELECT * FROM source ORDER BY 1 LIMIT 1000", NOT_RUN); // No outer LIMIT clause assertAnalysis(prestoAction, "INSERT INTO test SELECT * FROM source UNION ALL SELECT * FROM source", NOT_RUN); @@ -127,9 +144,59 @@ public void testFailedDataChangedLimitNoOrderBy() assertAnalysis(createPrestoAction(1001), "INSERT INTO test SELECT * FROM source LIMIT 2000", FAILED_DATA_CHANGED); } + @Test + public void testLimitOrderByNonDeterministic() + { + MockPrestoAction prestoAction = createPrestoAction( + ImmutableList.of( + ImmutableList.of(1, 2, 3, 4, 5, 1, 6), + ImmutableList.of(1, 2, 0, 0, 0, 1, 6)), + TIE_INSPECTOR_COLUMNS); + assertAnalysis(prestoAction, ORDER_BY_LIMIT_QUERY, NON_DETERMINISTIC); + assertAnalyzerQuery( + prestoAction, + "SELECT\n" + + " a b\n" + + ", c\n" + + ", *\n" + + ", e\n" + + ", a \"$$sort_key$$0\"\n" + + ", (c + d) \"$$sort_key$$3\"\n" + + "FROM\n" + + " source\n" + + "ORDER BY a ASC, b ASC, 2 DESC, (c + d) DESC\n" + + "LIMIT 1001"); + } + + @Test + public void testLimitOrderByDeterministic() + { + MockPrestoAction prestoAction = createPrestoAction(ImmutableList.of(ImmutableList.of(1, 2, 3, 4, 5, 1, 6)), TIE_INSPECTOR_COLUMNS); + assertAnalysis(prestoAction, ORDER_BY_LIMIT_QUERY, DETERMINISTIC); + + prestoAction = createPrestoAction( + ImmutableList.of( + ImmutableList.of(1, 2, 3, 4, 5, 1, 6), + ImmutableList.of(1, 2, 0, 0, 0, 1, 5)), + TIE_INSPECTOR_COLUMNS); + assertAnalysis(prestoAction, ORDER_BY_LIMIT_QUERY, DETERMINISTIC); + } + + @Test + public void testLimitOrderByFailedDataChanged() + { + MockPrestoAction prestoAction = createPrestoAction(ImmutableList.of(), TIE_INSPECTOR_COLUMNS); + assertAnalysis(prestoAction, ORDER_BY_LIMIT_QUERY, FAILED_DATA_CHANGED); + } + private static MockPrestoAction createPrestoAction(long rowCount) { - return new MockPrestoAction(new AtomicLong(rowCount)); + return new MockPrestoAction(ImmutableList.of(rowCount), ImmutableList.of()); + } + + private static MockPrestoAction createPrestoAction(List> rows, List columnNames) + { + return new MockPrestoAction(rows, columnNames); } private static void assertAnalysis(PrestoAction prestoAction, String query, LimitQueryDeterminismAnalysis expectedAnalysis) From 1f774667198e72e29a124e8a39dc88b7a2e609d8 Mon Sep 17 00:00:00 2001 From: Tim Meehan Date: Tue, 3 Mar 2020 16:57:19 -0800 Subject: [PATCH 3/8] Always use JSON for PlanFragment serialization SMILE support has flaky edge cases, and we cache the serialization now which already has reduced the cost. --- .../main/java/com/facebook/presto/server/TaskResource.java | 7 +------ .../presto/server/remotetask/HttpRemoteTaskFactory.java | 3 +-- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java b/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java index e4b7ee6e1e5f7..8b0b5b35253ce 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java @@ -114,12 +114,7 @@ public TaskResource( this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null"); this.responseExecutor = requireNonNull(responseExecutor, "responseExecutor is null"); this.timeoutExecutor = requireNonNull(timeoutExecutor, "timeoutExecutor is null"); - if (communicationConfig.isBinaryTransportEnabled()) { - this.planFragmentCodec = planFragmentSmileCodec; - } - else { - this.planFragmentCodec = wrapJsonCodec(planFragmentJsonCodec); - } + this.planFragmentCodec = wrapJsonCodec(planFragmentJsonCodec); } @GET diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java index dac8dcb76f511..e7205761f091a 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java @@ -117,14 +117,13 @@ public HttpRemoteTaskFactory( this.taskStatusCodec = taskStatusSmileCodec; this.taskInfoCodec = taskInfoSmileCodec; this.taskUpdateRequestCodec = taskUpdateRequestSmileCodec; - this.planFragmentCodec = planFragmentSmileCodec; } else { this.taskStatusCodec = wrapJsonCodec(taskStatusJsonCodec); this.taskInfoCodec = wrapJsonCodec(taskInfoJsonCodec); this.taskUpdateRequestCodec = wrapJsonCodec(taskUpdateRequestJsonCodec); - this.planFragmentCodec = wrapJsonCodec(planFragmentJsonCodec); } + this.planFragmentCodec = wrapJsonCodec(planFragmentJsonCodec); this.updateScheduledExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("task-info-update-scheduler-%s")); this.errorScheduledExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("remote-task-error-delay-%s")); From b429ddb4eabd15ab1f7c06242b0ef39ab5bf0534 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Tue, 3 Mar 2020 13:54:42 -0500 Subject: [PATCH 4/8] Wait for final task info on abort Wait for final task info on abort to avoid missing task statistics in case when query finishes before the final task info is received by the TaskInfoFetcher. --- .../presto/server/remotetask/HttpRemoteTask.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java index 5aaefebb1d0d1..5d72f1cfbab8c 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java @@ -771,10 +771,6 @@ private synchronized void abort(TaskStatus status) checkState(status.getState().isDone(), "cannot abort task with an incomplete status"); try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) { - // With recoverable grouped execution, failed task does not necessarily fail the whole query. - // Not updating task info makes query unable to finish in tests because failed task is stuck in RUNNING state. - // TODO: Investigate why this only happens in TestHiveRecoverableGroupedExecution when worker is closed, but not in production test via cli. - taskInfoFetcher.updateTaskInfo(getTaskInfo().withTaskStatus(status)); taskStatusFetcher.updateTaskStatus(status); // send abort to task @@ -880,7 +876,15 @@ private void failTask(Throwable cause) log.debug(cause, "Remote task %s failed with %s", taskStatus.getSelf(), cause); } - abort(failWith(getTaskStatus(), FAILED, ImmutableList.of(toFailure(cause)))); + TaskStatus failedTaskStatus = failWith(getTaskStatus(), FAILED, ImmutableList.of(toFailure(cause))); + // Transition task to failed state without waiting for the final task info returned by the abort request. + // The abort request is very likely not to succeed, leaving the task and the stage in the limbo state for + // the entire duration of abort retries. If the task is failed, it is not that important to actually + // record the final statistics and the final information about a failed task. + taskInfoFetcher.updateTaskInfo(getTaskInfo().withTaskStatus(failedTaskStatus)); + + // Initiate abort request + abort(failedTaskStatus); } private HttpUriBuilder getHttpUriBuilder(TaskStatus taskStatus) From 6d662bd06f953baa4a0278313941195e2489edb8 Mon Sep 17 00:00:00 2001 From: Mayank Garg Date: Tue, 3 Mar 2020 11:48:42 -0800 Subject: [PATCH 5/8] Fix finishStatisticsCollection to have correct ConnectorSession Without this fix, the ConnectionSession was created without any session properties and hence the implementation which tried to use session properties failed. --- .../main/java/com/facebook/presto/metadata/MetadataManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java index 1bc49bd10bd43..20c90e9197849 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java @@ -778,7 +778,7 @@ public void finishStatisticsCollection(Session session, AnalyzeTableHandle table { ConnectorId connectorId = tableHandle.getConnectorId(); CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, connectorId); - catalogMetadata.getMetadata().finishStatisticsCollection(session.toConnectorSession(), tableHandle.getConnectorHandle(), computedStatistics); + catalogMetadata.getMetadata().finishStatisticsCollection(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle(), computedStatistics); } @Override From b36f46321b4e83f2fcd32ca897f34cb489d7630b Mon Sep 17 00:00:00 2001 From: Bhavani Hari Date: Mon, 2 Mar 2020 11:27:11 -0800 Subject: [PATCH 6/8] Break presto-tests Travis jobs to reduce test times --- .travis.yml | 10 +++- presto-tests/pom.xml | 138 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 145 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 362b1858320c5..564f7b7293791 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,8 +11,14 @@ env: matrix: - MAVEN_CHECKS=true - WEBUI_CHECKS=true - - TEST_SPECIFIC_MODULES=presto-tests - - TEST_SPECIFIC_MODULES=presto-tests TEST_FLAGS="-P ci-only" + - TEST_SPECIFIC_MODULES=presto-tests TEST_FLAGS="-P presto-tests-execution-memory" + - TEST_SPECIFIC_MODULES=presto-tests TEST_FLAGS="-P presto-tests-general" + - TEST_SPECIFIC_MODULES=presto-tests TEST_FLAGS="-P ci-only-distributed-non-hash-gen" + - TEST_SPECIFIC_MODULES=presto-tests TEST_FLAGS="-P ci-only-tpch-distributed-queries" + - TEST_SPECIFIC_MODULES=presto-tests TEST_FLAGS="-P ci-only-local-queries" + - TEST_SPECIFIC_MODULES=presto-tests TEST_FLAGS="-P ci-only-distributed-queries" + - TEST_SPECIFIC_MODULES=presto-tests TEST_FLAGS="-P ci-only-aggregation-queries" + - TEST_SPECIFIC_MODULES=presto-tests TEST_FLAGS="-P ci-only-plan-determinism" - TEST_SPECIFIC_MODULES=presto-raptor - TEST_SPECIFIC_MODULES=presto-accumulo - TEST_SPECIFIC_MODULES=presto-cassandra TEST_FLAGS="-P test-cassandra-integration-smoke-test" diff --git a/presto-tests/pom.xml b/presto-tests/pom.xml index ccbfd685878dc..cda6b09e54f0d 100644 --- a/presto-tests/pom.xml +++ b/presto-tests/pom.xml @@ -220,8 +220,13 @@ **/TestDistributedQueriesNoHashGeneration.java + **/TestTpchDistributedQueries.java **/TestLocalQueries.java **/TestNonIterativeDistributedQueries.java + **/TestAggregations.java + **/TestOptimizeMixedDistinctAggregations.java + **/TestSpilledAggregations.java + **/TestQueryPlanDeterminism.java @@ -245,7 +250,7 @@ - ci-only + ci-only-distributed-non-hash-gen @@ -255,7 +260,56 @@ **/TestDistributedQueriesNoHashGeneration.java + + + + + + + + ci-only-tpch-distributed-queries + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + **/TestTpchDistributedQueries.java + + + + + + + + ci-only-local-queries + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/TestLocalQueries.java + + + + + + + + ci-only-distributed-queries + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/TestNonIterativeDistributedQueries.java @@ -263,5 +317,87 @@ + + ci-only-plan-determinism + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + **/TestQueryPlanDeterminism.java + + + + + + + + + ci-only-aggregation-queries + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + **/TestAggregations.java + **/TestOptimizeMixedDistinctAggregations.java + **/TestSpilledAggregations.java + + + + + + + + + presto-tests-execution-memory + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/execution/*.java + **/connector/informationschema/*.java + **/memory/*.java + + + + + + + + presto-tests-general + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/tests/*.java + + + **/TestDistributedQueriesNoHashGeneration.java + **/TestTpchDistributedQueries.java + **/TestLocalQueries.java + **/TestNonIterativeDistributedQueries.java + **/TestAggregations.java + **/TestOptimizeMixedDistinctAggregations.java + **/TestSpilledAggregations.java + **/TestQueryPlanDeterminism.java + + + + + + From d8d3939f3348451cdc9a2d322e9901856a488a7f Mon Sep 17 00:00:00 2001 From: Ying Su Date: Tue, 3 Mar 2020 14:14:07 -0800 Subject: [PATCH 7/8] Materialize LazyBlock in scan Fix the problem downstream operators might receive LazyBlock after scan introduced by https://github.com/prestodb/presto/pull/14169 --- .../facebook/presto/operator/project/InputPageProjection.java | 2 +- .../repartition/OptimizedPartitionedOutputOperator.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/operator/project/InputPageProjection.java b/presto-main/src/main/java/com/facebook/presto/operator/project/InputPageProjection.java index dba2d06c051f7..2cd6b4c89cced 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/project/InputPageProjection.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/project/InputPageProjection.java @@ -64,7 +64,7 @@ public Work project(ConnectorSession session, DriverYieldSignal yieldSign result = block.copyPositions(selectedPositions.getPositions(), selectedPositions.getOffset(), selectedPositions.size()); } else if (selectedPositions.getOffset() == 0 && selectedPositions.size() == page.getPositionCount()) { - result = block; + result = block.getLoadedBlock(); } else { result = block.getRegion(selectedPositions.getOffset(), selectedPositions.size()); diff --git a/presto-main/src/main/java/com/facebook/presto/operator/repartition/OptimizedPartitionedOutputOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/repartition/OptimizedPartitionedOutputOperator.java index 1548ca34b349a..8c27bf2ce27e4 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/repartition/OptimizedPartitionedOutputOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/repartition/OptimizedPartitionedOutputOperator.java @@ -164,7 +164,6 @@ public void addInput(Page page) return; } - page = page.getLoadedPage(); page = pagePreprocessor.apply(page); pagePartitioner.partitionPage(page); From 0daad02e5674c4d2bb7b02a337b8f26b0a512b85 Mon Sep 17 00:00:00 2001 From: Sujay Jain Date: Wed, 4 Mar 2020 16:02:12 -0800 Subject: [PATCH 8/8] test 1 --- .../java/com/facebook/presto/mongodb/TestMongoClientConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoClientConfig.java b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoClientConfig.java index 42e4c35c2edfe..f9ebe7c5edc42 100644 --- a/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoClientConfig.java +++ b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoClientConfig.java @@ -35,7 +35,7 @@ public void testDefaults() .setConnectionsPerHost(100) .setMaxWaitTime(120_000) .setConnectionTimeout(10_000) - .setSocketTimeout(0) + .setSocketTimeout(10) .setSocketKeepAlive(false) .setSslEnabled(false) .setCursorBatchSize(0)