Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.spi.plan.CteConsumerNode;
import com.facebook.presto.spi.plan.CteProducerNode;
import com.facebook.presto.spi.plan.CteReferenceNode;
import com.facebook.presto.spi.plan.DataOrganizationSpecification;
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.EquiJoinClause;
import com.facebook.presto.spi.plan.FilterNode;
Expand Down Expand Up @@ -480,7 +481,7 @@ public Optional<PlanNode> visitWindow(WindowNode node, Context context)
.sorted(comparing(this::writeValueAsString))
.collect(toImmutableSet());

WindowNode.Specification specification = new WindowNode.Specification(
DataOrganizationSpecification specification = new DataOrganizationSpecification(
node.getSpecification().getPartitionBy().stream()
.map(variable -> inlineAndCanonicalize(context.getExpressions(), variable))
.sorted(comparing(this::writeValueAsString))
Expand Down Expand Up @@ -694,7 +695,7 @@ public Optional<PlanNode> visitTopNRowNumber(TopNRowNumberNode node, Context con
Optional.empty(),
planNodeidAllocator.getNextId(),
source.get(),
new WindowNode.Specification(
new DataOrganizationSpecification(
partitionBy,
node.getSpecification().getOrderingScheme().map(scheme -> getCanonicalOrderingScheme(scheme, context.getExpressions()))),
rowNumberVariable,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner;

import com.facebook.presto.common.block.SortOrder;
import com.facebook.presto.sql.tree.SortItem;

public class OrderingTranslator
{
private OrderingTranslator() {}

public static SortOrder sortItemToSortOrder(SortItem sortItem)
{
if (sortItem.getOrdering() == SortItem.Ordering.ASCENDING) {
if (sortItem.getNullOrdering() == SortItem.NullOrdering.FIRST) {
return SortOrder.ASC_NULLS_FIRST;
}
return SortOrder.ASC_NULLS_LAST;
}

if (sortItem.getNullOrdering() == SortItem.NullOrdering.FIRST) {
return SortOrder.DESC_NULLS_FIRST;
}
return SortOrder.DESC_NULLS_LAST;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.AggregationNode.Aggregation;
import com.facebook.presto.spi.plan.Assignments;
import com.facebook.presto.spi.plan.DataOrganizationSpecification;
import com.facebook.presto.spi.plan.DeleteNode;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.LimitNode;
Expand Down Expand Up @@ -114,6 +115,7 @@
import static com.facebook.presto.sql.analyzer.ExpressionAnalyzer.isNumericType;
import static com.facebook.presto.sql.analyzer.ExpressionTreeUtils.getSourceLocation;
import static com.facebook.presto.sql.analyzer.TypeSignatureProvider.fromTypes;
import static com.facebook.presto.sql.planner.OrderingTranslator.sortItemToSortOrder;
import static com.facebook.presto.sql.planner.PlannerUtils.newVariable;
import static com.facebook.presto.sql.planner.PlannerUtils.toOrderingScheme;
import static com.facebook.presto.sql.planner.PlannerUtils.toSortOrder;
Expand Down Expand Up @@ -512,7 +514,7 @@ private PlanBuilder project(PlanBuilder subPlan, Iterable<Expression> expression
*
* @return the new subplan and a mapping of each expression to the symbol representing the coercion or an existing symbol if a coercion wasn't needed
*/
private PlanAndMappings coerce(PlanBuilder subPlan, List<Expression> expressions, Analysis analysis, PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, Metadata metadata)
public PlanAndMappings coerce(PlanBuilder subPlan, List<Expression> expressions, Analysis analysis, PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, Metadata metadata)
{
Assignments.Builder assignments = Assignments.builder();
assignments.putAll(subPlan.getRoot().getOutputVariables().stream().collect(toImmutableMap(Function.identity(), Function.identity())));
Expand Down Expand Up @@ -543,6 +545,28 @@ private PlanAndMappings coerce(PlanBuilder subPlan, List<Expression> expressions
return new PlanAndMappings(subPlan, mappings.build());
}

public static OrderingScheme translateOrderingScheme(List<SortItem> items, Function<com.facebook.presto.sql.tree.Expression, VariableReferenceExpression> coercions)
{
List<VariableReferenceExpression> coerced = items.stream()
.map(SortItem::getSortKey)
.map(coercions)
.collect(toImmutableList());

ImmutableList.Builder<VariableReferenceExpression> variables = ImmutableList.builder();
Map<VariableReferenceExpression, Ordering> orders = new HashMap<>();
for (int i = 0; i < coerced.size(); i++) {
VariableReferenceExpression variable = coerced.get(i);
// for multiple sort items based on the same expression, retain the first one:
// ORDER BY x DESC, x ASC, y --> ORDER BY x DESC, y
if (!orders.containsKey(variable)) {
variables.add(variable);
orders.put(variable, new Ordering(variable, sortItemToSortOrder(items.get(i))));
}
}

return new OrderingScheme(new ArrayList<Ordering>(orders.values()));
}

private Map<VariableReferenceExpression, RowExpression> coerce(Iterable<? extends Expression> expressions, PlanBuilder subPlan, TranslationMap translations)
{
ImmutableMap.Builder<VariableReferenceExpression, RowExpression> projections = ImmutableMap.builder();
Expand Down Expand Up @@ -1069,7 +1093,7 @@ else if (window.getFrame().isPresent()) {
subPlan.getRoot().getSourceLocation(),
idAllocator.getNextId(),
subPlan.getRoot(),
new WindowNode.Specification(
new DataOrganizationSpecification(
partitionByVariables.build(),
orderingScheme),
ImmutableMap.of(newVariable, function),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.Assignments;
import com.facebook.presto.spi.plan.CteReferenceNode;
import com.facebook.presto.spi.plan.DataOrganizationSpecification;
import com.facebook.presto.spi.plan.EquiJoinClause;
import com.facebook.presto.spi.plan.ExceptNode;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.IntersectNode;
import com.facebook.presto.spi.plan.JoinNode;
import com.facebook.presto.spi.plan.OrderingScheme;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.ProjectNode;
Expand All @@ -50,13 +52,13 @@
import com.facebook.presto.sql.analyzer.RelationId;
import com.facebook.presto.sql.analyzer.RelationType;
import com.facebook.presto.sql.analyzer.Scope;
import com.facebook.presto.sql.analyzer.SemanticException;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.optimizations.JoinNodeUtils;
import com.facebook.presto.sql.planner.optimizations.SampleNodeUtil;
import com.facebook.presto.sql.planner.plan.LateralJoinNode;
import com.facebook.presto.sql.planner.plan.SampleNode;
import com.facebook.presto.sql.planner.plan.TableFunctionNode;
import com.facebook.presto.sql.planner.plan.TableFunctionNode.TableArgumentProperties;
import com.facebook.presto.sql.planner.plan.UnnestNode;
import com.facebook.presto.sql.tree.AliasedRelation;
import com.facebook.presto.sql.tree.Cast;
Expand Down Expand Up @@ -87,16 +89,15 @@
import com.facebook.presto.sql.tree.SetOperation;
import com.facebook.presto.sql.tree.SymbolReference;
import com.facebook.presto.sql.tree.Table;
import com.facebook.presto.sql.tree.TableFunctionDescriptorArgument;
import com.facebook.presto.sql.tree.TableFunctionInvocation;
import com.facebook.presto.sql.tree.TableFunctionTableArgument;
import com.facebook.presto.sql.tree.TableSubquery;
import com.facebook.presto.sql.tree.Union;
import com.facebook.presto.sql.tree.Unnest;
import com.facebook.presto.sql.tree.Values;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.UnmodifiableIterator;
Expand Down Expand Up @@ -126,9 +127,9 @@
import static com.facebook.presto.sql.analyzer.ExpressionTreeUtils.isEqualComparisonExpression;
import static com.facebook.presto.sql.analyzer.ExpressionTreeUtils.resolveEnumLiteral;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.CteMaterializationStrategy.NONE;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.sql.analyzer.SemanticExceptions.notSupportedException;
import static com.facebook.presto.sql.planner.PlannerUtils.newVariable;
import static com.facebook.presto.sql.planner.QueryPlanner.translateOrderingScheme;
import static com.facebook.presto.sql.planner.TranslateExpressionsUtil.toRowExpression;
import static com.facebook.presto.sql.tree.Join.Type.INNER;
import static com.facebook.presto.sql.tree.Join.Type.LEFT;
Expand Down Expand Up @@ -235,48 +236,103 @@ protected RelationPlan visitTable(Table node, SqlPlannerContext context)
@Override
protected RelationPlan visitTableFunctionInvocation(TableFunctionInvocation node, SqlPlannerContext context)
{
node.getArguments().stream()
.forEach(argument -> {
if (argument.getValue() instanceof TableFunctionTableArgument) {
throw new SemanticException(NOT_SUPPORTED, argument, "Table arguments are not yet supported for table functions");
}
if (argument.getValue() instanceof TableFunctionDescriptorArgument) {
throw new SemanticException(NOT_SUPPORTED, argument, "Descriptor arguments are not yet supported for table functions");
}
});
Analysis.TableFunctionInvocationAnalysis functionAnalysis = analysis.getTableFunctionAnalysis(node);
ImmutableList.Builder<PlanNode> sources = ImmutableList.builder();
ImmutableList.Builder<TableArgumentProperties> sourceProperties = ImmutableList.builder();
ImmutableList.Builder<VariableReferenceExpression> outputVariables = ImmutableList.builder();

// create new symbols for table function's proper columns
RelationType relationType = analysis.getScope(node).getRelationType();
List<VariableReferenceExpression> properOutputs = IntStream.range(0, functionAnalysis.getProperColumnsCount())
.mapToObj(relationType::getFieldByIndex)
.map(field -> variableAllocator.newVariable(getSourceLocation(node), field.getName().get(), field.getType()))
.collect(toImmutableList());

// TODO handle input relations:
// 1. extract the input relations from node.getArguments() and plan them. Apply relation coercions if requested.
// 2. for each input relation, prepare the TableArgumentProperties record, consisting of:
// - row or set semantics (from the actualArgument)
// - prune when empty property (from the actualArgument)
// - pass through columns property (from the actualArgument)
// - optional Specification: ordering scheme and partitioning (from the node's argument) <- planned upon the source's RelationPlan (or combined RelationPlan from all sources)
// TODO add - argument name
// TODO add - mapping column name => Symbol // TODO mind the fields without names and duplicate field names in RelationType
List<RelationPlan> sources = ImmutableList.of();
List<TableFunctionNode.TableArgumentProperties> inputRelationsProperties = ImmutableList.of();
outputVariables.addAll(properOutputs);

// process sources in order of argument declarations
for (Analysis.TableArgumentAnalysis tableArgument : functionAnalysis.getTableArgumentAnalyses()) {
RelationPlan sourcePlan = process(tableArgument.getRelation(), context);
PlanBuilder sourcePlanBuilder = initializePlanBuilder(sourcePlan);

// map column names to symbols
// note: hidden columns are included in the mapping. They are present both in sourceDescriptor.allFields, and in sourcePlan.fieldMappings
// note: for an aliased relation or a CTE, the field names in the relation type are in the same case as specified in the alias.
// quotes and canonicalization rules are not applied.
ImmutableMultimap.Builder<String, VariableReferenceExpression> columnMapping = ImmutableMultimap.builder();
RelationType sourceDescriptor = sourcePlan.getDescriptor();
for (int i = 0; i < sourceDescriptor.getAllFieldCount(); i++) {
Optional<String> name = sourceDescriptor.getFieldByIndex(i).getName();
if (name.isPresent()) {
columnMapping.put(name.get(), sourcePlan.getVariable(i));
Optional<List<Expression>> partitionBy = tableArgument.getPartitionBy();
if (partitionBy.isPresent()) {
sourcePlanBuilder.getTranslations().put(partitionBy.get().get(i), sourcePlan.getVariable(i));
}
}
}

Scope scope = analysis.getScope(node);
Optional<DataOrganizationSpecification> specification = Optional.empty();

// if the table argument has set semantics, create Specification
if (!tableArgument.isRowSemantics()) {
// partition by
List<VariableReferenceExpression> partitionBy = ImmutableList.of();
// if there are partitioning columns, they might have to be coerced for copartitioning
if (tableArgument.getPartitionBy().isPresent() && !tableArgument.getPartitionBy().get().isEmpty()) {
List<Expression> partitioningColumns = tableArgument.getPartitionBy().get();
QueryPlanner partitionQueryPlanner = new QueryPlanner(analysis, variableAllocator, idAllocator, lambdaDeclarationToVariableMap, metadata, session, context, sqlParser);
QueryPlanner.PlanAndMappings copartitionCoercions = partitionQueryPlanner.coerce(sourcePlanBuilder, partitioningColumns, analysis, idAllocator, variableAllocator, metadata);
sourcePlanBuilder = copartitionCoercions.getSubPlan();
partitionBy = partitioningColumns.stream()
.map(copartitionCoercions::get)
.collect(toImmutableList());
}

ImmutableList.Builder<VariableReferenceExpression> outputVariablesBuilder = ImmutableList.builder();
for (Field field : scope.getRelationType().getAllFields()) {
VariableReferenceExpression variable = variableAllocator.newVariable(getSourceLocation(node), field.getName().get(), field.getType());
outputVariablesBuilder.add(variable);
// order by
Optional<OrderingScheme> orderBy = Optional.empty();
if (tableArgument.getOrderBy().isPresent()) {
// the ordering symbols are not coerced
orderBy = Optional.of(translateOrderingScheme(tableArgument.getOrderBy().get().getSortItems(), sourcePlanBuilder::translate));
}

specification = Optional.of(new DataOrganizationSpecification(partitionBy, orderBy));
}

sources.add(sourcePlanBuilder.getRoot());
sourceProperties.add(new TableArgumentProperties(
tableArgument.getArgumentName(),
columnMapping.build(),
tableArgument.isRowSemantics(),
tableArgument.isPruneWhenEmpty(),
tableArgument.isPassThroughColumns(),
specification));

// add output symbols passed from the table argument
if (tableArgument.isPassThroughColumns()) {
// the original output symbols from the source node, not coerced
// note: hidden columns are included. They are present in sourcePlan.fieldMappings
outputVariables.addAll(sourcePlan.getFieldMappings());
}
else if (tableArgument.getPartitionBy().isPresent()) {
tableArgument.getPartitionBy().get().stream()
// the original symbols for partitioning columns, not coerced
.map(sourcePlanBuilder::translate)
.forEach(outputVariables::add);
}
}

List<VariableReferenceExpression> outputVariables = outputVariablesBuilder.build();
PlanNode root = new TableFunctionNode(
idAllocator.getNextId(),
functionAnalysis.getFunctionName(),
functionAnalysis.getArguments(),
outputVariablesBuilder.build(),
sources.stream().map(RelationPlan::getRoot).collect(toImmutableList()),
inputRelationsProperties,
properOutputs,
sources.build(),
sourceProperties.build(),
functionAnalysis.getCopartitioningLists(),
new TableFunctionHandle(functionAnalysis.getConnectorId(), functionAnalysis.getConnectorTableFunctionHandle(), functionAnalysis.getTransactionHandle()));

return new RelationPlan(root, scope, outputVariables);
return new RelationPlan(root, analysis.getScope(node), outputVariables.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.Assignments;
import com.facebook.presto.spi.plan.DataOrganizationSpecification;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.spi.plan.Ordering;
Expand All @@ -27,7 +28,6 @@
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.TopNNode;
import com.facebook.presto.spi.plan.WindowNode.Specification;
import com.facebook.presto.spi.relation.CallExpression;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
Expand Down Expand Up @@ -307,7 +307,7 @@ public Optional<DecorrelationResult> visitTopN(TopNNode node, Void context)
decorrelatedChildNode.getSourceLocation(),
node.getId(),
decorrelatedChildNode,
new Specification(
new DataOrganizationSpecification(
ImmutableList.copyOf(childDecorrelationResult.variablesToPropagate),
Optional.of(orderingScheme)),
variableAllocator.newVariable("row_number", BIGINT),
Expand Down
Loading