diff --git a/presto-iceberg/pom.xml b/presto-iceberg/pom.xml
index 1b0d79001419e..e9745965caded 100644
--- a/presto-iceberg/pom.xml
+++ b/presto-iceberg/pom.xml
@@ -608,7 +608,6 @@
jakarta.servlet
jakarta.servlet-api
- test
@@ -619,7 +618,6 @@
com.facebook.presto
presto-main-base
- test
com.facebook.presto
@@ -634,12 +632,10 @@
com.facebook.presto
presto-parser
- test
com.facebook.presto
presto-analyzer
- test
@@ -708,7 +704,6 @@
org.jetbrains
annotations
- test
@@ -782,7 +777,6 @@
org.apache.commons
commons-math3
- test
@@ -798,6 +792,11 @@
org.apache.httpcomponents.core5:httpcore5
com.amazonaws:aws-java-sdk-s3
com.amazonaws:aws-java-sdk-core
+ org.apache.commons:commons-math3
+ com.facebook.presto:presto-parser
+ com.facebook.presto:presto-analyzer
+ org.jetbrains:annotations
+ jakarta.servlet:jakarta.servlet-api
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
index 136a6f61ff57b..7b202a400990a 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
@@ -528,7 +528,7 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
predicateColumns));
// capture subfields from domainPredicate to add to remainingPredicate
// so those filters don't get lost
- Map columnTypes = getColumns(icebergTable.schema(), icebergTable.spec(), typeManager).stream()
+ Map columnTypes = getColumns(session, icebergTable.schema(), icebergTable.spec(), typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getName, icebergColumnHandle -> getColumnMetadata(session, tableHandle, icebergColumnHandle).getType()));
RowExpression subfieldPredicate = getSubfieldPredicate(session, icebergTableLayoutHandle, columnTypes, functionResolution, rowExpressionService);
@@ -689,7 +689,7 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(ConnectorSession se
table.getIcebergTableName(),
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
- getColumnsForWrite(icebergTable.schema(), icebergTable.spec(), typeManager),
+ getColumnsForWrite(session, icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
getFileFormat(icebergTable),
getCompressionCodec(session),
@@ -932,7 +932,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
icebergTableHandle.getIcebergTableName(),
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
- getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
+ getColumns(session, icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
getFileFormat(icebergTable),
getCompressionCodec(session),
@@ -1336,7 +1336,7 @@ public Map getColumnHandles(ConnectorSession session, Conn
}
ImmutableMap.Builder columnHandles = ImmutableMap.builder();
- for (IcebergColumnHandle columnHandle : getColumns(schema, icebergTable.spec(), typeManager)) {
+ for (IcebergColumnHandle columnHandle : getColumns(session, schema, icebergTable.spec(), typeManager)) {
columnHandles.put(columnHandle.getName(), columnHandle);
}
if (table.getIcebergTableName().getTableType() != CHANGELOG) {
@@ -2344,7 +2344,7 @@ public ConnectorInsertTableHandle beginRefreshMaterializedView(
storageTableHandle.getIcebergTableName(),
toPrestoSchema(storageTable.schema(), typeManager),
toPrestoPartitionSpec(storageTable.spec(), typeManager),
- getColumns(storageTable.schema(), storageTable.spec(), typeManager),
+ getColumns(session, storageTable.schema(), storageTable.spec(), typeManager),
storageTable.location(),
getFileFormat(storageTable),
getCompressionCodec(session),
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java
index 1ada5a522791e..8d053d7196f72 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java
@@ -259,6 +259,15 @@ public static IcebergColumnHandle create(Types.NestedField column, TypeManager t
columnType);
}
+ public static IcebergColumnHandle create(int partitionFieldId, Types.NestedField column, TypeManager typeManager, ColumnType columnType)
+ {
+ return new IcebergColumnHandle(
+ createColumnIdentity(column.name(), partitionFieldId, column.type()),
+ toPrestoType(column.type(), typeManager),
+ Optional.ofNullable(column.doc()),
+ columnType);
+ }
+
public static Subfield getPushedDownSubfield(IcebergColumnHandle column)
{
checkArgument(isPushedDownSubfield(column), format("not a valid pushed down subfield: type=%s, subfields=%s", column.getColumnType(), column.getRequiredSubfields()));
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java
index 863760b7be87b..3ee1d4e15c659 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java
@@ -437,7 +437,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty(), Optional.empty()),
toPrestoSchema(metadata.schema(), typeManager),
toPrestoPartitionSpec(metadata.spec(), typeManager),
- getColumnsForWrite(metadata.schema(), metadata.spec(), typeManager),
+ // TODO: Changed from getColumns to getColumnsForWrite
+ getColumnsForWrite(session, metadata.schema(), metadata.spec(), typeManager),
targetPath,
fileFormat,
getCompressionCodec(session),
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java
index e47cb76abdf50..c0919a1da5166 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java
@@ -479,7 +479,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty(), Optional.empty()),
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
- getColumnsForWrite(icebergTable.schema(), icebergTable.spec(), typeManager),
+ // Changed to getColumnsForWrite
+ getColumnsForWrite(session, icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
fileFormat,
getCompressionCodec(session),
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java
index 24f2aa7f32c86..4b1e7493bcb20 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java
@@ -159,7 +159,8 @@ public IcebergPageSink(
this.sortOrder = requireNonNull(sortOrder, "sortOrder is null");
String tempDirectoryPath = locationProvider.newDataLocation("sort-tmp-files");
this.tempDirectory = new Path(tempDirectoryPath);
- List types = getColumnsForWrite(outputSchema, partitionSpec, requireNonNull(sortParameters.getTypeManager(), "typeManager is null")).stream()
+ // Changed to getColumnsForWrite
+ List types = getColumnsForWrite(session, outputSchema, partitionSpec, requireNonNull(sortParameters.getTypeManager(), "typeManager is null")).stream()
.map(IcebergColumnHandle::getType)
.collect(toImmutableList());
this.sortParameters = sortParameters;
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java
index 62d0d01c7b7b4..fe96a9e060576 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java
@@ -778,7 +778,7 @@ public ConnectorPageSource createPageSource(
// add any additional columns which may need to be read from storage
// by delete filters
boolean equalityDeletesRequired = table.getIcebergTableName().getTableType() == IcebergTableType.DATA;
- requiredColumnsForDeletes(tableSchema, partitionSpec, split.getDeletes(), equalityDeletesRequired)
+ requiredColumnsForDeletes(session, tableSchema, partitionSpec, split.getDeletes(), equalityDeletesRequired)
.stream()
.filter(not(icebergColumns::contains))
.forEach(columnsToReadFromStorage::add);
@@ -918,7 +918,7 @@ else if (subColumn.getId() == MERGE_PARTITION_DATA.getId()) {
pageIndexerFactory,
hdfsEnvironment,
hdfsContext,
- getColumns(tableSchema, partitionSpec, typeManager),
+ getColumns(session, tableSchema, partitionSpec, typeManager),
jsonCodec,
session,
split.getFileFormat(),
@@ -948,7 +948,7 @@ else if (subColumn.getId() == MERGE_PARTITION_DATA.getId()) {
return dataSource;
}
- private Set requiredColumnsForDeletes(Schema schema,
+ private Set requiredColumnsForDeletes(ConnectorSession session, Schema schema,
PartitionSpec partitionSpec,
List deletes,
boolean equalityDeletesRequired)
@@ -959,7 +959,7 @@ private Set requiredColumnsForDeletes(Schema schema,
requiredColumns.add(IcebergColumnHandle.create(ROW_POSITION, typeManager, IcebergColumnHandle.ColumnType.REGULAR));
}
else if (deleteFile.content() == EQUALITY_DELETES && equalityDeletesRequired) {
- getColumns(deleteFile.equalityFieldIds().stream(), schema, partitionSpec, typeManager)
+ getColumns(session, deleteFile.equalityFieldIds().stream(), schema, partitionSpec, typeManager)
.forEach(requiredColumns::add);
}
}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
index c3a051ab1f334..0a109ea67bb55 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
@@ -15,11 +15,13 @@
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.units.DataSize;
+import com.facebook.presto.FullConnectorSession;
import com.facebook.presto.common.GenericInternalException;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.common.resourceGroups.QueryType;
import com.facebook.presto.common.type.DecimalType;
import com.facebook.presto.common.type.Decimals;
import com.facebook.presto.common.type.TimestampType;
@@ -147,6 +149,7 @@
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_TABLE_TIMESTAMP;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.isMetadataColumnId;
+import static com.facebook.presto.iceberg.IcebergPartitionType.ALL;
import static com.facebook.presto.iceberg.IcebergPartitionType.IDENTITY;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isMergeOnReadModeEnabled;
@@ -398,14 +401,21 @@ public static Map> getPartitionFields(PartitionSpec partiti
return partitionFields;
}
- public static List getColumns(Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
+ public static List getColumns(ConnectorSession session, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
{
- return getColumns(schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager);
+ return getColumns(session, schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager);
}
- public static List getColumns(Stream fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
+ public static List getColumns(ConnectorSession session, Stream fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
{
- Set partitionFieldNames = getPartitionFields(partitionSpec, IDENTITY).keySet();
+ IcebergPartitionType partitionType = IDENTITY;
+ if (session instanceof FullConnectorSession) {
+ Optional queryType = ((FullConnectorSession) session).getSession().getQueryType();
+ if (queryType.isPresent() && queryType.get() == QueryType.INSERT) {
+ partitionType = ALL;
+ }
+ }
+ Set partitionFieldNames = getPartitionFields(partitionSpec, partitionType).keySet();
return fields
.map(schema::findField)
@@ -415,20 +425,25 @@ public static List getColumns(Stream fields, Schem
.collect(toImmutableList());
}
- public static List getColumnsForWrite(Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
+ public static List getColumnsForWrite(ConnectorSession session, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
{
- return getColumnsForWrite(schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager);
+ return getColumnsForWrite(session, schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager);
}
- private static List getColumnsForWrite(Stream fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
+ private static List getColumnsForWrite(ConnectorSession session, Stream fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
{
- Set partitionSourceIds = partitionSpec.fields().stream()
- .map(PartitionField::sourceId)
- .collect(toImmutableSet());
+ IcebergPartitionType partitionType = IDENTITY;
+ if (session instanceof FullConnectorSession) {
+ Optional queryType = ((FullConnectorSession) session).getSession().getQueryType();
+ if (queryType.isPresent() && queryType.get() == QueryType.INSERT) {
+ partitionType = ALL;
+ }
+ }
+ Set partitionFieldNames = getPartitionFields(partitionSpec, partitionType).keySet();
return fields
.map(schema::findField)
- .map(column -> partitionSourceIds.contains(column.fieldId()) ?
+ .map(column -> partitionFieldNames.contains(column.name()) ?
IcebergColumnHandle.create(column, typeManager, PARTITION_KEY) :
IcebergColumnHandle.create(column, typeManager, REGULAR))
.collect(toImmutableList());
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java
index 4006b331876b6..5eec3442fe002 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java
@@ -82,7 +82,7 @@ public ChangelogSplitSource(
{
requireNonNull(session, "session is null");
requireNonNull(typeManager, "typeManager is null");
- this.columnHandles = getColumns(table.schema(), table.spec(), typeManager);
+ this.columnHandles = getColumns(session, table.schema(), table.spec(), typeManager);
this.minimumAssignedSplitWeight = getMinimumAssignedSplitWeight(session);
this.targetSplitSize = getTargetSplitSize(session, tableScan).toBytes();
this.nodeSelectionStrategy = getNodeSelectionStrategy(session);
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java
index 0866f26ceb156..9e13a5bccea50 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java
@@ -197,7 +197,7 @@ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(Connec
tableHandle.getIcebergTableName(),
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
- getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
+ getColumns(session, icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
getFileFormat(icebergTable),
getCompressionCodec(session),
diff --git a/presto-native-execution/Makefile b/presto-native-execution/Makefile
index 9397a4c632daa..a0184dde77199 100644
--- a/presto-native-execution/Makefile
+++ b/presto-native-execution/Makefile
@@ -114,7 +114,7 @@ release: #: Build the release version
$(MAKE) cmake BUILD_DIR=release BUILD_TYPE=Release && \
$(MAKE) build BUILD_DIR=release
-cmake-and-build: #: cmake and build without updating submodules which requires git
+cmake-and-build: #: cmake and build without updating submodules which requires git
cmake -B "$(BUILD_BASE_DIR)/$(BUILD_DIR)" $(FORCE_COLOR) $(CMAKE_FLAGS) $(EXTRA_CMAKE_FLAGS) && \
cmake --build $(BUILD_BASE_DIR)/$(BUILD_DIR) -j $(NUM_THREADS)
diff --git a/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp
index e2d5891ebf0d6..34c67caa9fed3 100644
--- a/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp
+++ b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp
@@ -395,7 +395,8 @@ HivePrestoToVeloxConnector::toVeloxTableHandle(
std::unique_ptr
HivePrestoToVeloxConnector::toVeloxInsertTableHandle(
const protocol::CreateHandle* createHandle,
- const TypeParser& typeParser) const {
+ const TypeParser& typeParser,
+ memory::MemoryPool* pool) const {
auto hiveOutputTableHandle =
std::dynamic_pointer_cast(
createHandle->handle.connectorHandle);
@@ -423,7 +424,8 @@ HivePrestoToVeloxConnector::toVeloxInsertTableHandle(
std::unique_ptr
HivePrestoToVeloxConnector::toVeloxInsertTableHandle(
const protocol::InsertHandle* insertHandle,
- const TypeParser& typeParser) const {
+ const TypeParser& typeParser, const {
+ velox::memory::MemoryPool* pool)
auto hiveInsertTableHandle =
std::dynamic_pointer_cast(
insertHandle->handle.connectorHandle);
diff --git a/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.h
index fdbc4fc6d4f6c..c91e5d5b0573a 100644
--- a/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.h
+++ b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.h
@@ -46,12 +46,14 @@ class HivePrestoToVeloxConnector final : public PrestoToVeloxConnector {
std::unique_ptr
toVeloxInsertTableHandle(
const protocol::CreateHandle* createHandle,
- const TypeParser& typeParser) const final;
+ const TypeParser& typeParser,
+ velox::memory::MemoryPool* pool) const final;
std::unique_ptr
toVeloxInsertTableHandle(
const protocol::InsertHandle* insertHandle,
- const TypeParser& typeParser) const final;
+ const TypeParser& typeParser,
+ velox::memory::MemoryPool* pool) const final;
std::unique_ptr
createVeloxPartitionFunctionSpec(
diff --git a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp
index d1ff56b3c7585..a6dc171589b14 100644
--- a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp
+++ b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp
@@ -243,11 +243,29 @@ IcebergPrestoToVeloxConnector::toVeloxColumnHandle(
velox::type::fbhive::HiveTypeParser hiveTypeParser;
auto type = stringToType(icebergColumn->type, typeParser);
+ std::function
+ collectNestedField = [&](const protocol::iceberg::ColumnIdentity* column)
+ -> connector::hive::iceberg::IcebergNestedField {
+ std::vector children;
+ if (!column->children.empty()) {
+ children.reserve(column->children.size());
+ for (const auto& child : column->children) {
+ children.push_back(collectNestedField(&child));
+ }
+ }
+ auto type = stringToType(icebergColumn->type, typeParser);
+ return connector::hive::iceberg::IcebergNestedField(column->id, children);
+ };
+
+ auto nestedField = collectNestedField(&icebergColumn->columnIdentity);
+
return std::make_unique(
icebergColumn->columnIdentity.name,
toHiveColumnType(icebergColumn->columnType),
type,
toParquetField(icebergColumn->columnIdentity),
+ nestedField,
toRequiredSubfields(icebergColumn->requiredSubfields));
}
@@ -407,6 +425,152 @@ IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
toFileCompressionKind(icebergInsertTableHandle->compressionCodec)));
}
+std::unique_ptr
+IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
+ const protocol::CreateHandle* createHandle,
+ const TypeParser& typeParser,
+ velox::memory::MemoryPool* pool) const {
+ auto icebergOutputTableHandle =
+ std::dynamic_pointer_cast(
+ createHandle->handle.connectorHandle);
+
+ VELOX_CHECK_NOT_NULL(
+ icebergOutputTableHandle,
+ "Unexpected output table handle type {}",
+ createHandle->handle.connectorHandle->_type);
+
+ const auto inputColumns =
+ toIcebergColumns(icebergOutputTableHandle->inputColumns, typeParser);
+
+ auto sortedBy = toIcebergSortingColumns(
+ icebergOutputTableHandle->sortOrder, icebergOutputTableHandle->schema);
+
+ return std::make_unique<
+ velox::connector::hive::iceberg::IcebergInsertTableHandle>(
+ inputColumns,
+ std::make_shared(
+ fmt::format("{}/data", icebergOutputTableHandle->outputPath),
+ fmt::format("{}/data", icebergOutputTableHandle->outputPath),
+ connector::hive::LocationHandle::TableType::kNew),
+ toVeloxIcebergPartitionSpec(
+ icebergOutputTableHandle->partitionSpec, typeParser),
+ pool,
+ toVeloxFileFormat(icebergOutputTableHandle->fileFormat),
+ std::move(sortedBy),
+ std::optional(
+ toFileCompressionKind(icebergOutputTableHandle->compressionCodec)));
+}
+
+std::vector
+IcebergPrestoToVeloxConnector::toIcebergSortingColumns(
+ protocol::List sortFields,
+ const protocol::iceberg::PrestoIcebergSchema& schema) const {
+ std::vector sortedBy;
+ sortedBy.reserve(sortFields.size());
+ for (const auto& sortField : sortFields) {
+ velox::core::SortOrder veloxSortOrder(
+ sortField.sortOrder == protocol::SortOrder::ASC_NULLS_LAST ||
+ sortField.sortOrder == protocol::SortOrder::ASC_NULLS_FIRST,
+ sortField.sortOrder == protocol::SortOrder::DESC_NULLS_FIRST ||
+ sortField.sortOrder == protocol::SortOrder::ASC_NULLS_FIRST);
+
+ for (const auto& column : schema.columns) {
+ if (column.id == sortField.sourceColumnId) {
+ sortedBy.emplace_back(
+ connector::hive::iceberg::IcebergSortingColumn(
+ column.name, veloxSortOrder));
+ break;
+ }
+ }
+ }
+ return sortedBy;
+}
+
+std::unique_ptr
+IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
+ const protocol::InsertHandle* insertHandle,
+ const TypeParser& typeParser,
+ velox::memory::MemoryPool* pool) const {
+ auto icebergInsertTableHandle =
+ std::dynamic_pointer_cast(
+ insertHandle->handle.connectorHandle);
+
+ VELOX_CHECK_NOT_NULL(
+ icebergInsertTableHandle,
+ "Unexpected insert table handle type {}",
+ insertHandle->handle.connectorHandle->_type);
+
+ const auto inputColumns =
+ toIcebergColumns(icebergInsertTableHandle->inputColumns, typeParser);
+
+ auto sortedBy = toIcebergSortingColumns(
+ icebergInsertTableHandle->sortOrder, icebergInsertTableHandle->schema);
+
+ return std::make_unique(
+ inputColumns,
+ std::make_shared(
+ fmt::format("{}/data", icebergInsertTableHandle->outputPath),
+ fmt::format("{}/data", icebergInsertTableHandle->outputPath),
+ connector::hive::LocationHandle::TableType::kExisting),
+ toVeloxIcebergPartitionSpec(
+ icebergInsertTableHandle->partitionSpec, typeParser),
+ pool,
+ toVeloxFileFormat(icebergInsertTableHandle->fileFormat),
+ std::move(sortedBy),
+ std::optional(
+ toFileCompressionKind(icebergInsertTableHandle->compressionCodec)));
+}
+
+std::vector<
+ std::shared_ptr>
+IcebergPrestoToVeloxConnector::toIcebergColumns(
+ const protocol::List& inputColumns,
+ const TypeParser& typeParser) const {
+ std::vector<
+ std::shared_ptr>
+ icebergColumns;
+ icebergColumns.reserve(inputColumns.size());
+ for (const auto& columnHandle : inputColumns) {
+ icebergColumns.emplace_back(
+ std::dynamic_pointer_cast<
+ connector::hive::iceberg::IcebergColumnHandle>(
+ std::shared_ptr(toVeloxColumnHandle(&columnHandle, typeParser))));
+ }
+ return icebergColumns;
+}
+
+connector::hive::iceberg::IcebergPartitionSpec::Field
+IcebergPrestoToVeloxConnector::toVeloxIcebergPartitionField(
+ const protocol::iceberg::IcebergPartitionField& field,
+ const facebook::presto::TypeParser& typeParser,
+ const protocol::iceberg::PrestoIcebergSchema& schema) const {
+ std::string type;
+ for (const auto& column : schema.columns) {
+ if (column.name == field.name) {
+ type = column.prestoType;
+ }
+ }
+ return connector::hive::iceberg::IcebergPartitionSpec::Field(
+ field.name,
+ stringToType(type, typeParser),
+ static_cast(field.transform),
+ field.parameter ? *field.parameter : std::optional());
+}
+
+std::unique_ptr
+IcebergPrestoToVeloxConnector::toVeloxIcebergPartitionSpec(
+ const protocol::iceberg::PrestoIcebergPartitionSpec& spec,
+ const facebook::presto::TypeParser& typeParser) const {
+ std::vector fields;
+ fields.reserve(spec.fields.size());
+ for (auto field : spec.fields) {
+ fields.emplace_back(
+ toVeloxIcebergPartitionField(field, typeParser, spec.schema));
+ }
+ return std::make_unique(
+ spec.specId, fields);
+}
+
std::vector
IcebergPrestoToVeloxConnector::toIcebergColumns(
const protocol::List& inputColumns,
diff --git a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h
index 193efa74d6008..b6c2d1b9298f3 100644
--- a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h
+++ b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h
@@ -16,8 +16,9 @@
#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
#include "presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h"
-
#include "velox/connectors/hive/iceberg/IcebergColumnHandle.h"
+#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
+#include "velox/connectors/hive/iceberg/PartitionSpec.h"
namespace facebook::presto {
@@ -58,12 +59,40 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {
const protocol::InsertHandle* insertHandle,
const TypeParser& typeParser) const final;
+ std::unique_ptr
+ toVeloxInsertTableHandle(
+ const protocol::CreateHandle* createHandle,
+ const TypeParser& typeParser,
+ velox::memory::MemoryPool* pool) const final;
+
+ std::unique_ptr
+ toVeloxInsertTableHandle(
+ const protocol::InsertHandle* insertHandle,
+ const TypeParser& typeParser,
+ velox::memory::MemoryPool* pool) const final;
+
private:
std::vector
toIcebergColumns(
const protocol::List&
inputColumns,
const TypeParser& typeParser) const;
+
+ std::vector
+ toIcebergSortingColumns(
+ protocol::List,
+ const protocol::iceberg::PrestoIcebergSchema& schema) const;
+
+ velox::connector::hive::iceberg::IcebergPartitionSpec::Field
+ toVeloxIcebergPartitionField(
+ const protocol::iceberg::IcebergPartitionField& filed,
+ const facebook::presto::TypeParser& typeParser,
+ const protocol::iceberg::PrestoIcebergSchema& schema) const;
+
+ std::unique_ptr
+ toVeloxIcebergPartitionSpec(
+ const protocol::iceberg::PrestoIcebergPartitionSpec& spec,
+ const TypeParser& typeParser) const;
};
} // namespace facebook::presto
diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h
index 43988195634db..e60ff3c2d575d 100644
--- a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h
+++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h
@@ -77,7 +77,8 @@ class PrestoToVeloxConnector {
velox::connector::ConnectorInsertTableHandle>
toVeloxInsertTableHandle(
const protocol::CreateHandle* createHandle,
- const TypeParser& typeParser) const {
+ const TypeParser& typeParser,
+ velox::memory::MemoryPool* pool) const {
return {};
}
@@ -85,7 +86,8 @@ class PrestoToVeloxConnector {
velox::connector::ConnectorInsertTableHandle>
toVeloxInsertTableHandle(
const protocol::InsertHandle* insertHandle,
- const TypeParser& typeParser) const {
+ const TypeParser& typeParser,
+ velox::memory::MemoryPool* pool) const {
return {};
}
diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp
index ba8dc6913bd90..5861e11de4a83 100644
--- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp
+++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp
@@ -1563,8 +1563,8 @@ VeloxQueryPlanConverterBase::toVeloxQueryPlan(
connectorId = createHandle->handle.connectorId;
auto& connector =
getPrestoToVeloxConnector(createHandle->handle.connectorHandle->_type);
- auto veloxHandle =
- connector.toVeloxInsertTableHandle(createHandle.get(), typeParser_);
+ auto veloxHandle = connector.toVeloxInsertTableHandle(
+ createHandle.get(), typeParser_, pool_);
connectorInsertHandle = std::shared_ptr(std::move(veloxHandle));
} else if (
const auto insertHandle =
@@ -1573,8 +1573,8 @@ VeloxQueryPlanConverterBase::toVeloxQueryPlan(
connectorId = insertHandle->handle.connectorId;
auto& connector =
getPrestoToVeloxConnector(insertHandle->handle.connectorHandle->_type);
- auto veloxHandle =
- connector.toVeloxInsertTableHandle(insertHandle.get(), typeParser_);
+ auto veloxHandle = connector.toVeloxInsertTableHandle(
+ insertHandle.get(), typeParser_, pool_);
connectorInsertHandle = std::shared_ptr(std::move(veloxHandle));
}