From bc35a65179be1ad1331da458a1441fed60ac975a Mon Sep 17 00:00:00 2001 From: mohsaka <135669458+mohsaka@users.noreply.github.com> Date: Mon, 11 May 2026 15:46:54 -0700 Subject: [PATCH 1/2] feat(native): Support sorted by during write to Iceberg tables #26182 Co-authored-by: Ping Liu --- presto-iceberg/pom.xml | 7 +- .../iceberg/IcebergAbstractMetadata.java | 8 +- .../presto/iceberg/IcebergColumnHandle.java | 10 ++ .../presto/iceberg/IcebergHiveMetadata.java | 1 + .../presto/iceberg/IcebergNativeMetadata.java | 1 + .../presto/iceberg/IcebergPageSink.java | 1 + .../iceberg/IcebergPageSourceProvider.java | 8 +- .../facebook/presto/iceberg/IcebergUtil.java | 18 +- .../changelog/ChangelogSplitSource.java | 2 +- presto-native-execution/Makefile | 6 +- .../connectors/HivePrestoToVeloxConnector.cpp | 6 +- .../connectors/HivePrestoToVeloxConnector.h | 6 +- .../IcebergPrestoToVeloxConnector.cpp | 164 ++++++++++++++++++ .../IcebergPrestoToVeloxConnector.h | 31 +++- .../main/connectors/PrestoToVeloxConnector.h | 6 +- .../main/types/PrestoToVeloxQueryPlan.cpp | 8 +- 16 files changed, 252 insertions(+), 31 deletions(-) diff --git a/presto-iceberg/pom.xml b/presto-iceberg/pom.xml index 1b0d79001419e..36ec7b5d9821a 100644 --- a/presto-iceberg/pom.xml +++ b/presto-iceberg/pom.xml @@ -619,7 +619,6 @@ com.facebook.presto presto-main-base - test com.facebook.presto @@ -634,12 +633,10 @@ com.facebook.presto presto-parser - test com.facebook.presto presto-analyzer - test @@ -782,7 +779,6 @@ org.apache.commons commons-math3 - test @@ -798,6 +794,9 @@ org.apache.httpcomponents.core5:httpcore5 com.amazonaws:aws-java-sdk-s3 com.amazonaws:aws-java-sdk-core + org.apache.commons:commons-math3 + com.facebook.presto:presto-parser + com.facebook.presto:presto-analyzer diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index 136a6f61ff57b..7c5c48b29c266 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -528,7 +528,7 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa predicateColumns)); // capture subfields from domainPredicate to add to remainingPredicate // so those filters don't get lost - Map columnTypes = getColumns(icebergTable.schema(), icebergTable.spec(), typeManager).stream() + Map columnTypes = getColumns(session, icebergTable.schema(), icebergTable.spec(), typeManager).stream() .collect(toImmutableMap(IcebergColumnHandle::getName, icebergColumnHandle -> getColumnMetadata(session, tableHandle, icebergColumnHandle).getType())); RowExpression subfieldPredicate = getSubfieldPredicate(session, icebergTableLayoutHandle, columnTypes, functionResolution, rowExpressionService); @@ -932,7 +932,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT icebergTableHandle.getIcebergTableName(), toPrestoSchema(icebergTable.schema(), typeManager), toPrestoPartitionSpec(icebergTable.spec(), typeManager), - getColumns(icebergTable.schema(), icebergTable.spec(), typeManager), + getColumns(session, icebergTable.schema(), icebergTable.spec(), typeManager), icebergTable.location(), getFileFormat(icebergTable), getCompressionCodec(session), @@ -1336,7 +1336,7 @@ public Map getColumnHandles(ConnectorSession session, Conn } ImmutableMap.Builder columnHandles = ImmutableMap.builder(); - for (IcebergColumnHandle columnHandle : getColumns(schema, icebergTable.spec(), typeManager)) { + for (IcebergColumnHandle columnHandle : getColumns(session, schema, icebergTable.spec(), typeManager)) { columnHandles.put(columnHandle.getName(), columnHandle); } if (table.getIcebergTableName().getTableType() != CHANGELOG) { @@ -2344,7 +2344,7 @@ public ConnectorInsertTableHandle beginRefreshMaterializedView( storageTableHandle.getIcebergTableName(), toPrestoSchema(storageTable.schema(), typeManager), toPrestoPartitionSpec(storageTable.spec(), typeManager), - getColumns(storageTable.schema(), storageTable.spec(), typeManager), + getColumns(session, storageTable.schema(), storageTable.spec(), typeManager), storageTable.location(), getFileFormat(storageTable), getCompressionCodec(session), diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java index 1ada5a522791e..de8da710ec723 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java @@ -259,6 +259,16 @@ public static IcebergColumnHandle create(Types.NestedField column, TypeManager t columnType); } + public static IcebergColumnHandle create(int partitionFieldId, Types.NestedField column, TypeManager typeManager, ColumnType columnType) + { + return new IcebergColumnHandle( + createColumnIdentity(column.name(), partitionFieldId, column.type()), + toPrestoType(column.type(), typeManager), + Optional.ofNullable(column.doc()), + columnType); + } + + public static Subfield getPushedDownSubfield(IcebergColumnHandle column) { checkArgument(isPushedDownSubfield(column), format("not a valid pushed down subfield: type=%s, subfields=%s", column.getColumnType(), column.getRequiredSubfields())); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index 863760b7be87b..5d6aec94bb5dc 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -437,6 +437,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty(), Optional.empty()), toPrestoSchema(metadata.schema(), typeManager), toPrestoPartitionSpec(metadata.spec(), typeManager), + // TODO: Changed from getColumns to getColumnsForWrite getColumnsForWrite(metadata.schema(), metadata.spec(), typeManager), targetPath, fileFormat, diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java index e47cb76abdf50..2688d6c9a8c48 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java @@ -479,6 +479,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty(), Optional.empty()), toPrestoSchema(icebergTable.schema(), typeManager), toPrestoPartitionSpec(icebergTable.spec(), typeManager), + // Changed to getColumnsForWrite getColumnsForWrite(icebergTable.schema(), icebergTable.spec(), typeManager), icebergTable.location(), fileFormat, diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java index 24f2aa7f32c86..ea011d961d9a7 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java @@ -159,6 +159,7 @@ public IcebergPageSink( this.sortOrder = requireNonNull(sortOrder, "sortOrder is null"); String tempDirectoryPath = locationProvider.newDataLocation("sort-tmp-files"); this.tempDirectory = new Path(tempDirectoryPath); + // Changed to getColumnsForWrite List types = getColumnsForWrite(outputSchema, partitionSpec, requireNonNull(sortParameters.getTypeManager(), "typeManager is null")).stream() .map(IcebergColumnHandle::getType) .collect(toImmutableList()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java index 62d0d01c7b7b4..fe96a9e060576 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java @@ -778,7 +778,7 @@ public ConnectorPageSource createPageSource( // add any additional columns which may need to be read from storage // by delete filters boolean equalityDeletesRequired = table.getIcebergTableName().getTableType() == IcebergTableType.DATA; - requiredColumnsForDeletes(tableSchema, partitionSpec, split.getDeletes(), equalityDeletesRequired) + requiredColumnsForDeletes(session, tableSchema, partitionSpec, split.getDeletes(), equalityDeletesRequired) .stream() .filter(not(icebergColumns::contains)) .forEach(columnsToReadFromStorage::add); @@ -918,7 +918,7 @@ else if (subColumn.getId() == MERGE_PARTITION_DATA.getId()) { pageIndexerFactory, hdfsEnvironment, hdfsContext, - getColumns(tableSchema, partitionSpec, typeManager), + getColumns(session, tableSchema, partitionSpec, typeManager), jsonCodec, session, split.getFileFormat(), @@ -948,7 +948,7 @@ else if (subColumn.getId() == MERGE_PARTITION_DATA.getId()) { return dataSource; } - private Set requiredColumnsForDeletes(Schema schema, + private Set requiredColumnsForDeletes(ConnectorSession session, Schema schema, PartitionSpec partitionSpec, List deletes, boolean equalityDeletesRequired) @@ -959,7 +959,7 @@ private Set requiredColumnsForDeletes(Schema schema, requiredColumns.add(IcebergColumnHandle.create(ROW_POSITION, typeManager, IcebergColumnHandle.ColumnType.REGULAR)); } else if (deleteFile.content() == EQUALITY_DELETES && equalityDeletesRequired) { - getColumns(deleteFile.equalityFieldIds().stream(), schema, partitionSpec, typeManager) + getColumns(session, deleteFile.equalityFieldIds().stream(), schema, partitionSpec, typeManager) .forEach(requiredColumns::add); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index c3a051ab1f334..1ad5f19ca9f5e 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -15,11 +15,13 @@ import com.facebook.airlift.log.Logger; import com.facebook.airlift.units.DataSize; +import com.facebook.presto.FullConnectorSession; import com.facebook.presto.common.GenericInternalException; import com.facebook.presto.common.RuntimeStats; import com.facebook.presto.common.predicate.Domain; import com.facebook.presto.common.predicate.NullableValue; import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.resourceGroups.QueryType; import com.facebook.presto.common.type.DecimalType; import com.facebook.presto.common.type.Decimals; import com.facebook.presto.common.type.TimestampType; @@ -147,6 +149,7 @@ import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_TABLE_TIMESTAMP; import static com.facebook.presto.iceberg.IcebergMetadataColumn.isMetadataColumnId; +import static com.facebook.presto.iceberg.IcebergPartitionType.ALL; import static com.facebook.presto.iceberg.IcebergPartitionType.IDENTITY; import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; import static com.facebook.presto.iceberg.IcebergSessionProperties.isMergeOnReadModeEnabled; @@ -398,14 +401,21 @@ public static Map> getPartitionFields(PartitionSpec partiti return partitionFields; } - public static List getColumns(Schema schema, PartitionSpec partitionSpec, TypeManager typeManager) + public static List getColumns(ConnectorSession session, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager) { - return getColumns(schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager); + return getColumns(session, schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager); } - public static List getColumns(Stream fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager) + public static List getColumns(ConnectorSession session, Stream fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager) { - Set partitionFieldNames = getPartitionFields(partitionSpec, IDENTITY).keySet(); + IcebergPartitionType partitionType = IDENTITY; + if (session instanceof FullConnectorSession) { + Optional queryType = ((FullConnectorSession) session).getSession().getQueryType(); + if (queryType.isPresent() && queryType.get() == QueryType.INSERT) { + partitionType = ALL; + } + } + Set partitionFieldNames = getPartitionFields(partitionSpec, partitionType).keySet(); return fields .map(schema::findField) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java index 4006b331876b6..5eec3442fe002 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java @@ -82,7 +82,7 @@ public ChangelogSplitSource( { requireNonNull(session, "session is null"); requireNonNull(typeManager, "typeManager is null"); - this.columnHandles = getColumns(table.schema(), table.spec(), typeManager); + this.columnHandles = getColumns(session, table.schema(), table.spec(), typeManager); this.minimumAssignedSplitWeight = getMinimumAssignedSplitWeight(session); this.targetSplitSize = getTargetSplitSize(session, tableScan).toBytes(); this.nodeSelectionStrategy = getNodeSelectionStrategy(session); diff --git a/presto-native-execution/Makefile b/presto-native-execution/Makefile index 9397a4c632daa..8e6f704b87d23 100644 --- a/presto-native-execution/Makefile +++ b/presto-native-execution/Makefile @@ -114,9 +114,9 @@ release: #: Build the release version $(MAKE) cmake BUILD_DIR=release BUILD_TYPE=Release && \ $(MAKE) build BUILD_DIR=release -cmake-and-build: #: cmake and build without updating submodules which requires git - cmake -B "$(BUILD_BASE_DIR)/$(BUILD_DIR)" $(FORCE_COLOR) $(CMAKE_FLAGS) $(EXTRA_CMAKE_FLAGS) && \ - cmake --build $(BUILD_BASE_DIR)/$(BUILD_DIR) -j $(NUM_THREADS) +cmake-and-build: #: cmake and build without updating submodules which requires git + cmake -B "$(BUILD_BASE_DIR)/$(BUILD_DIR)" $(FORCE_COLOR) $(CMAKE_FLAGS) $(EXTRA_CMAKE_FLAGS) && \ + cmake --build $(BUILD_BASE_DIR)/$(BUILD_DIR) -j $(NUM_THREADS) unittest: debug #: Build with debugging and run unit tests cd $(BUILD_BASE_DIR)/debug && ctest -j $(NUM_THREADS) -VV --output-on-failure --exclude-regex ^velox.* diff --git a/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp index e2d5891ebf0d6..34c67caa9fed3 100644 --- a/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp @@ -395,7 +395,8 @@ HivePrestoToVeloxConnector::toVeloxTableHandle( std::unique_ptr HivePrestoToVeloxConnector::toVeloxInsertTableHandle( const protocol::CreateHandle* createHandle, - const TypeParser& typeParser) const { + const TypeParser& typeParser, + memory::MemoryPool* pool) const { auto hiveOutputTableHandle = std::dynamic_pointer_cast( createHandle->handle.connectorHandle); @@ -423,7 +424,8 @@ HivePrestoToVeloxConnector::toVeloxInsertTableHandle( std::unique_ptr HivePrestoToVeloxConnector::toVeloxInsertTableHandle( const protocol::InsertHandle* insertHandle, - const TypeParser& typeParser) const { + const TypeParser& typeParser, const { + velox::memory::MemoryPool* pool) auto hiveInsertTableHandle = std::dynamic_pointer_cast( insertHandle->handle.connectorHandle); diff --git a/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.h index fdbc4fc6d4f6c..c91e5d5b0573a 100644 --- a/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.h +++ b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.h @@ -46,12 +46,14 @@ class HivePrestoToVeloxConnector final : public PrestoToVeloxConnector { std::unique_ptr toVeloxInsertTableHandle( const protocol::CreateHandle* createHandle, - const TypeParser& typeParser) const final; + const TypeParser& typeParser, + velox::memory::MemoryPool* pool) const final; std::unique_ptr toVeloxInsertTableHandle( const protocol::InsertHandle* insertHandle, - const TypeParser& typeParser) const final; + const TypeParser& typeParser, + velox::memory::MemoryPool* pool) const final; std::unique_ptr createVeloxPartitionFunctionSpec( diff --git a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp index d1ff56b3c7585..a6dc171589b14 100644 --- a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp @@ -243,11 +243,29 @@ IcebergPrestoToVeloxConnector::toVeloxColumnHandle( velox::type::fbhive::HiveTypeParser hiveTypeParser; auto type = stringToType(icebergColumn->type, typeParser); + std::function + collectNestedField = [&](const protocol::iceberg::ColumnIdentity* column) + -> connector::hive::iceberg::IcebergNestedField { + std::vector children; + if (!column->children.empty()) { + children.reserve(column->children.size()); + for (const auto& child : column->children) { + children.push_back(collectNestedField(&child)); + } + } + auto type = stringToType(icebergColumn->type, typeParser); + return connector::hive::iceberg::IcebergNestedField(column->id, children); + }; + + auto nestedField = collectNestedField(&icebergColumn->columnIdentity); + return std::make_unique( icebergColumn->columnIdentity.name, toHiveColumnType(icebergColumn->columnType), type, toParquetField(icebergColumn->columnIdentity), + nestedField, toRequiredSubfields(icebergColumn->requiredSubfields)); } @@ -407,6 +425,152 @@ IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle( toFileCompressionKind(icebergInsertTableHandle->compressionCodec))); } +std::unique_ptr +IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle( + const protocol::CreateHandle* createHandle, + const TypeParser& typeParser, + velox::memory::MemoryPool* pool) const { + auto icebergOutputTableHandle = + std::dynamic_pointer_cast( + createHandle->handle.connectorHandle); + + VELOX_CHECK_NOT_NULL( + icebergOutputTableHandle, + "Unexpected output table handle type {}", + createHandle->handle.connectorHandle->_type); + + const auto inputColumns = + toIcebergColumns(icebergOutputTableHandle->inputColumns, typeParser); + + auto sortedBy = toIcebergSortingColumns( + icebergOutputTableHandle->sortOrder, icebergOutputTableHandle->schema); + + return std::make_unique< + velox::connector::hive::iceberg::IcebergInsertTableHandle>( + inputColumns, + std::make_shared( + fmt::format("{}/data", icebergOutputTableHandle->outputPath), + fmt::format("{}/data", icebergOutputTableHandle->outputPath), + connector::hive::LocationHandle::TableType::kNew), + toVeloxIcebergPartitionSpec( + icebergOutputTableHandle->partitionSpec, typeParser), + pool, + toVeloxFileFormat(icebergOutputTableHandle->fileFormat), + std::move(sortedBy), + std::optional( + toFileCompressionKind(icebergOutputTableHandle->compressionCodec))); +} + +std::vector +IcebergPrestoToVeloxConnector::toIcebergSortingColumns( + protocol::List sortFields, + const protocol::iceberg::PrestoIcebergSchema& schema) const { + std::vector sortedBy; + sortedBy.reserve(sortFields.size()); + for (const auto& sortField : sortFields) { + velox::core::SortOrder veloxSortOrder( + sortField.sortOrder == protocol::SortOrder::ASC_NULLS_LAST || + sortField.sortOrder == protocol::SortOrder::ASC_NULLS_FIRST, + sortField.sortOrder == protocol::SortOrder::DESC_NULLS_FIRST || + sortField.sortOrder == protocol::SortOrder::ASC_NULLS_FIRST); + + for (const auto& column : schema.columns) { + if (column.id == sortField.sourceColumnId) { + sortedBy.emplace_back( + connector::hive::iceberg::IcebergSortingColumn( + column.name, veloxSortOrder)); + break; + } + } + } + return sortedBy; +} + +std::unique_ptr +IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle( + const protocol::InsertHandle* insertHandle, + const TypeParser& typeParser, + velox::memory::MemoryPool* pool) const { + auto icebergInsertTableHandle = + std::dynamic_pointer_cast( + insertHandle->handle.connectorHandle); + + VELOX_CHECK_NOT_NULL( + icebergInsertTableHandle, + "Unexpected insert table handle type {}", + insertHandle->handle.connectorHandle->_type); + + const auto inputColumns = + toIcebergColumns(icebergInsertTableHandle->inputColumns, typeParser); + + auto sortedBy = toIcebergSortingColumns( + icebergInsertTableHandle->sortOrder, icebergInsertTableHandle->schema); + + return std::make_unique( + inputColumns, + std::make_shared( + fmt::format("{}/data", icebergInsertTableHandle->outputPath), + fmt::format("{}/data", icebergInsertTableHandle->outputPath), + connector::hive::LocationHandle::TableType::kExisting), + toVeloxIcebergPartitionSpec( + icebergInsertTableHandle->partitionSpec, typeParser), + pool, + toVeloxFileFormat(icebergInsertTableHandle->fileFormat), + std::move(sortedBy), + std::optional( + toFileCompressionKind(icebergInsertTableHandle->compressionCodec))); +} + +std::vector< + std::shared_ptr> +IcebergPrestoToVeloxConnector::toIcebergColumns( + const protocol::List& inputColumns, + const TypeParser& typeParser) const { + std::vector< + std::shared_ptr> + icebergColumns; + icebergColumns.reserve(inputColumns.size()); + for (const auto& columnHandle : inputColumns) { + icebergColumns.emplace_back( + std::dynamic_pointer_cast< + connector::hive::iceberg::IcebergColumnHandle>( + std::shared_ptr(toVeloxColumnHandle(&columnHandle, typeParser)))); + } + return icebergColumns; +} + +connector::hive::iceberg::IcebergPartitionSpec::Field +IcebergPrestoToVeloxConnector::toVeloxIcebergPartitionField( + const protocol::iceberg::IcebergPartitionField& field, + const facebook::presto::TypeParser& typeParser, + const protocol::iceberg::PrestoIcebergSchema& schema) const { + std::string type; + for (const auto& column : schema.columns) { + if (column.name == field.name) { + type = column.prestoType; + } + } + return connector::hive::iceberg::IcebergPartitionSpec::Field( + field.name, + stringToType(type, typeParser), + static_cast(field.transform), + field.parameter ? *field.parameter : std::optional()); +} + +std::unique_ptr +IcebergPrestoToVeloxConnector::toVeloxIcebergPartitionSpec( + const protocol::iceberg::PrestoIcebergPartitionSpec& spec, + const facebook::presto::TypeParser& typeParser) const { + std::vector fields; + fields.reserve(spec.fields.size()); + for (auto field : spec.fields) { + fields.emplace_back( + toVeloxIcebergPartitionField(field, typeParser, spec.schema)); + } + return std::make_unique( + spec.specId, fields); +} + std::vector IcebergPrestoToVeloxConnector::toIcebergColumns( const protocol::List& inputColumns, diff --git a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h index 193efa74d6008..b6c2d1b9298f3 100644 --- a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h +++ b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h @@ -16,8 +16,9 @@ #include "presto_cpp/main/connectors/PrestoToVeloxConnector.h" #include "presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h" - #include "velox/connectors/hive/iceberg/IcebergColumnHandle.h" +#include "velox/connectors/hive/iceberg/IcebergDataSink.h" +#include "velox/connectors/hive/iceberg/PartitionSpec.h" namespace facebook::presto { @@ -58,12 +59,40 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector { const protocol::InsertHandle* insertHandle, const TypeParser& typeParser) const final; + std::unique_ptr + toVeloxInsertTableHandle( + const protocol::CreateHandle* createHandle, + const TypeParser& typeParser, + velox::memory::MemoryPool* pool) const final; + + std::unique_ptr + toVeloxInsertTableHandle( + const protocol::InsertHandle* insertHandle, + const TypeParser& typeParser, + velox::memory::MemoryPool* pool) const final; + private: std::vector toIcebergColumns( const protocol::List& inputColumns, const TypeParser& typeParser) const; + + std::vector + toIcebergSortingColumns( + protocol::List, + const protocol::iceberg::PrestoIcebergSchema& schema) const; + + velox::connector::hive::iceberg::IcebergPartitionSpec::Field + toVeloxIcebergPartitionField( + const protocol::iceberg::IcebergPartitionField& filed, + const facebook::presto::TypeParser& typeParser, + const protocol::iceberg::PrestoIcebergSchema& schema) const; + + std::unique_ptr + toVeloxIcebergPartitionSpec( + const protocol::iceberg::PrestoIcebergPartitionSpec& spec, + const TypeParser& typeParser) const; }; } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h index 43988195634db..e60ff3c2d575d 100644 --- a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h +++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h @@ -77,7 +77,8 @@ class PrestoToVeloxConnector { velox::connector::ConnectorInsertTableHandle> toVeloxInsertTableHandle( const protocol::CreateHandle* createHandle, - const TypeParser& typeParser) const { + const TypeParser& typeParser, + velox::memory::MemoryPool* pool) const { return {}; } @@ -85,7 +86,8 @@ class PrestoToVeloxConnector { velox::connector::ConnectorInsertTableHandle> toVeloxInsertTableHandle( const protocol::InsertHandle* insertHandle, - const TypeParser& typeParser) const { + const TypeParser& typeParser, + velox::memory::MemoryPool* pool) const { return {}; } diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp index ba8dc6913bd90..5861e11de4a83 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp @@ -1563,8 +1563,8 @@ VeloxQueryPlanConverterBase::toVeloxQueryPlan( connectorId = createHandle->handle.connectorId; auto& connector = getPrestoToVeloxConnector(createHandle->handle.connectorHandle->_type); - auto veloxHandle = - connector.toVeloxInsertTableHandle(createHandle.get(), typeParser_); + auto veloxHandle = connector.toVeloxInsertTableHandle( + createHandle.get(), typeParser_, pool_); connectorInsertHandle = std::shared_ptr(std::move(veloxHandle)); } else if ( const auto insertHandle = @@ -1573,8 +1573,8 @@ VeloxQueryPlanConverterBase::toVeloxQueryPlan( connectorId = insertHandle->handle.connectorId; auto& connector = getPrestoToVeloxConnector(insertHandle->handle.connectorHandle->_type); - auto veloxHandle = - connector.toVeloxInsertTableHandle(insertHandle.get(), typeParser_); + auto veloxHandle = connector.toVeloxInsertTableHandle( + insertHandle.get(), typeParser_, pool_); connectorInsertHandle = std::shared_ptr(std::move(veloxHandle)); } From 1da89e53bb727f8dd1a4c3771cd44da6e08c245f Mon Sep 17 00:00:00 2001 From: mohsaka <135669458+mohsaka@users.noreply.github.com> Date: Mon, 11 May 2026 15:32:32 -0700 Subject: [PATCH 2/2] Fix rebase issues --- presto-iceberg/pom.xml | 4 ++-- .../iceberg/IcebergAbstractMetadata.java | 2 +- .../presto/iceberg/IcebergColumnHandle.java | 1 - .../presto/iceberg/IcebergHiveMetadata.java | 2 +- .../presto/iceberg/IcebergNativeMetadata.java | 2 +- .../presto/iceberg/IcebergPageSink.java | 2 +- .../facebook/presto/iceberg/IcebergUtil.java | 19 ++++++++++++------- .../procedure/RewriteDataFilesProcedure.java | 2 +- presto-native-execution/Makefile | 4 ++-- 9 files changed, 21 insertions(+), 17 deletions(-) diff --git a/presto-iceberg/pom.xml b/presto-iceberg/pom.xml index 36ec7b5d9821a..e9745965caded 100644 --- a/presto-iceberg/pom.xml +++ b/presto-iceberg/pom.xml @@ -608,7 +608,6 @@ jakarta.servlet jakarta.servlet-api - test @@ -705,7 +704,6 @@ org.jetbrains annotations - test @@ -797,6 +795,8 @@ org.apache.commons:commons-math3 com.facebook.presto:presto-parser com.facebook.presto:presto-analyzer + org.jetbrains:annotations + jakarta.servlet:jakarta.servlet-api diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index 7c5c48b29c266..7b202a400990a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -689,7 +689,7 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(ConnectorSession se table.getIcebergTableName(), toPrestoSchema(icebergTable.schema(), typeManager), toPrestoPartitionSpec(icebergTable.spec(), typeManager), - getColumnsForWrite(icebergTable.schema(), icebergTable.spec(), typeManager), + getColumnsForWrite(session, icebergTable.schema(), icebergTable.spec(), typeManager), icebergTable.location(), getFileFormat(icebergTable), getCompressionCodec(session), diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java index de8da710ec723..8d053d7196f72 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java @@ -268,7 +268,6 @@ public static IcebergColumnHandle create(int partitionFieldId, Types.NestedField columnType); } - public static Subfield getPushedDownSubfield(IcebergColumnHandle column) { checkArgument(isPushedDownSubfield(column), format("not a valid pushed down subfield: type=%s, subfields=%s", column.getColumnType(), column.getRequiredSubfields())); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index 5d6aec94bb5dc..3ee1d4e15c659 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -438,7 +438,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con toPrestoSchema(metadata.schema(), typeManager), toPrestoPartitionSpec(metadata.spec(), typeManager), // TODO: Changed from getColumns to getColumnsForWrite - getColumnsForWrite(metadata.schema(), metadata.spec(), typeManager), + getColumnsForWrite(session, metadata.schema(), metadata.spec(), typeManager), targetPath, fileFormat, getCompressionCodec(session), diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java index 2688d6c9a8c48..c0919a1da5166 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java @@ -480,7 +480,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con toPrestoSchema(icebergTable.schema(), typeManager), toPrestoPartitionSpec(icebergTable.spec(), typeManager), // Changed to getColumnsForWrite - getColumnsForWrite(icebergTable.schema(), icebergTable.spec(), typeManager), + getColumnsForWrite(session, icebergTable.schema(), icebergTable.spec(), typeManager), icebergTable.location(), fileFormat, getCompressionCodec(session), diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java index ea011d961d9a7..4b1e7493bcb20 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java @@ -160,7 +160,7 @@ public IcebergPageSink( String tempDirectoryPath = locationProvider.newDataLocation("sort-tmp-files"); this.tempDirectory = new Path(tempDirectoryPath); // Changed to getColumnsForWrite - List types = getColumnsForWrite(outputSchema, partitionSpec, requireNonNull(sortParameters.getTypeManager(), "typeManager is null")).stream() + List types = getColumnsForWrite(session, outputSchema, partitionSpec, requireNonNull(sortParameters.getTypeManager(), "typeManager is null")).stream() .map(IcebergColumnHandle::getType) .collect(toImmutableList()); this.sortParameters = sortParameters; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 1ad5f19ca9f5e..0a109ea67bb55 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -425,20 +425,25 @@ public static List getColumns(ConnectorSession session, Str .collect(toImmutableList()); } - public static List getColumnsForWrite(Schema schema, PartitionSpec partitionSpec, TypeManager typeManager) + public static List getColumnsForWrite(ConnectorSession session, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager) { - return getColumnsForWrite(schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager); + return getColumnsForWrite(session, schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager); } - private static List getColumnsForWrite(Stream fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager) + private static List getColumnsForWrite(ConnectorSession session, Stream fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager) { - Set partitionSourceIds = partitionSpec.fields().stream() - .map(PartitionField::sourceId) - .collect(toImmutableSet()); + IcebergPartitionType partitionType = IDENTITY; + if (session instanceof FullConnectorSession) { + Optional queryType = ((FullConnectorSession) session).getSession().getQueryType(); + if (queryType.isPresent() && queryType.get() == QueryType.INSERT) { + partitionType = ALL; + } + } + Set partitionFieldNames = getPartitionFields(partitionSpec, partitionType).keySet(); return fields .map(schema::findField) - .map(column -> partitionSourceIds.contains(column.fieldId()) ? + .map(column -> partitionFieldNames.contains(column.name()) ? IcebergColumnHandle.create(column, typeManager, PARTITION_KEY) : IcebergColumnHandle.create(column, typeManager, REGULAR)) .collect(toImmutableList()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java index 0866f26ceb156..9e13a5bccea50 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java @@ -197,7 +197,7 @@ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(Connec tableHandle.getIcebergTableName(), toPrestoSchema(icebergTable.schema(), typeManager), toPrestoPartitionSpec(icebergTable.spec(), typeManager), - getColumns(icebergTable.schema(), icebergTable.spec(), typeManager), + getColumns(session, icebergTable.schema(), icebergTable.spec(), typeManager), icebergTable.location(), getFileFormat(icebergTable), getCompressionCodec(session), diff --git a/presto-native-execution/Makefile b/presto-native-execution/Makefile index 8e6f704b87d23..a0184dde77199 100644 --- a/presto-native-execution/Makefile +++ b/presto-native-execution/Makefile @@ -115,8 +115,8 @@ release: #: Build the release version $(MAKE) build BUILD_DIR=release cmake-and-build: #: cmake and build without updating submodules which requires git - cmake -B "$(BUILD_BASE_DIR)/$(BUILD_DIR)" $(FORCE_COLOR) $(CMAKE_FLAGS) $(EXTRA_CMAKE_FLAGS) && \ - cmake --build $(BUILD_BASE_DIR)/$(BUILD_DIR) -j $(NUM_THREADS) + cmake -B "$(BUILD_BASE_DIR)/$(BUILD_DIR)" $(FORCE_COLOR) $(CMAKE_FLAGS) $(EXTRA_CMAKE_FLAGS) && \ + cmake --build $(BUILD_BASE_DIR)/$(BUILD_DIR) -j $(NUM_THREADS) unittest: debug #: Build with debugging and run unit tests cd $(BUILD_BASE_DIR)/debug && ctest -j $(NUM_THREADS) -VV --output-on-failure --exclude-regex ^velox.*